[TOC]
架构
介绍
完整的 yaml 文件参见 pilot-yaml。
Dockerfile
FROM istionightly/base_debug
ADD pilot-discovery /usr/local/bin/
ADD cacert.pem /cacert.pem
ENTRYPOINT ["/usr/local/bin/pilot-discovery"]
启动命令行
# ps -ef -www|grep pilot
$/usr/local/bin/pilot-discovery discovery
命令行帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help
Usage:
pilot-discovery [command]
Available Commands:
discovery Start Istio proxy discovery service
help Help about any command
request Makes an HTTP request to Pilot metrics/debug endpoint
version Prints out build version information
discovery 相关的帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help
Defaulting container name to discovery.
Start Istio proxy discovery service
Usage:
pilot-discovery discovery [flags]
Flags:
-a, --appNamespace string Restrict the applications namespace the controller manages; if not set, controller watches all namespaces
--cfConfig string Cloud Foundry config file
--clusterRegistriesConfigMap string ConfigMap map for clusters config store
--clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs
--configDir string Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.
--consulserverInterval duration Interval (in seconds) for polling the Consul service registry (default 2s)
--consulserverURL string URL for the Consul server
--disable-install-crds Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected. It is recommended to be disable for highly available setups.
--discovery_cache Enable caching discovery service responses (default true)
--domain string DNS domain suffix (default "cluster.local")
--grpcAddr string Discovery service grpc address (default ":15010")
-h, --help help for discovery
--httpAddr string Discovery service HTTP address (default ":8080")
--kubeconfig string Use a Kubernetes configuration file instead of in-cluster configuration
--meshConfig string File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh")
--monitoringAddr string HTTP address to use for the exposing pilot self-monitoring information (default ":9093")
-n, --namespace string Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable
--plugins stringSlice comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter])
--profile Enable profiling via web interface host:port/debug/pprof (default true)
--registries stringSlice Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes])
--resync duration Controller resync interval (default 1m0s)
--secureGrpcAddr string Discovery service grpc address, with https (default ":15012")
--webhookEndpoint string Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket
Global Flags:
省略
主要参数:
名称 | 默认值 | 备注 |
---|---|---|
–appNamespace | 空 | 与 helm 安装中的 oneNamespace 对应 |
–configDir | 空 | 表明 pilot 的两种来源:配置文件和 CRD |
–discovery_cache | ture | 启动 cache,有助于提升性能 |
–domain | “cluster.local” | k8s 中域后缀 |
–grpcAddr | :15010 | |
–httpAddr | :8080 | |
–meshConfig | “/etc/istio/config/mesh” | |
–registries | Kubernetes |
代码分析
函数入口
discoveryCmd = &cobra.Command{
Use: "discovery",
Short: "Start Istio proxy discovery service.",
Args: cobra.ExactArgs(0),
RunE: func(c *cobra.Command, args []string) error {
// ...
// Create the stop channel for all of the servers.
stop := make(chan struct{})
// Create the server for the discovery service.
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
// Start the server
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}
cmd.WaitSignal(stop)
return nil
},
}
istio.io/istio/pilot/pkg/bootstrap/server.go
mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh
// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
// ...
s := &Server{
filewatcher: filewatcher.NewWatcher(),
}
// 省略错误处理
s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
s.initMesh(&args) // 初始化配置,并添加到 filewatcher 中, /etc/istio/config/mesh
// 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加
// serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks"
// 初始化配置,并加入到 filewatcher 中监听
s.initMeshNetworks(&args)
// initMixerSan configures the mixerSAN configuration item.
// The mesh must already have been configured.
s.initMixerSan(&args)
// creates the config controller in the pilotConfig.
// 最终会创建一个 crd.NewController 实例
s.initConfigController(&args)
/* 内部以 kube 为例,表明主要流程
controller, err := s.makeKubeConfigController(args)
s.configController = controller
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop) // 1. 第一个启动的 configController
})
*/
// creates and initializes the service controllers
s.initServiceControllers(&args)
/*
s.createK8sServiceControllers(serviceControllers, args)
--> kube.NewController(s.kubeClient, args.Config.ControllerOptions)
s.addStartFunc(func(stop <-chan struct{}) error {
go s.ServiceController.Run(stop) // 2. 第二个启动的 ServiceController
return nil
})
*/
// 初始化 DiscoveryService gRPC 服务器,后面详细讲解
// 添加2个 func 到 startFuncs 中
s.initDiscoveryService(&args)
// initializes the configuration for the pilot monitoring server.
// 添加1个 func 到 startFuncs 中
s.initMonitor(&args)
// starts the secret controller to watch for remote
// clusters and initialize the multicluster structures.
// 主要指定了,连接到远程集群的配置和需要监控的 namespace,并启动一个 secret controller
// 监视 istio/multiCluster=true 的 secret
// --clusterRegistriesConfigMap ConfigMap map for clusters config store
// --clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs
// 暂时不分析
s.initClusterRegistries(&args)
return s, nil
}
// 将 NewServer 函数中初始化的函数依次启动起来
func (s *Server) Start(stop <-chan struct{}) error {
// Now start all of the components.
for _, fn := range s.startFuncs {
fn(stop)
}
}
istio.io/api/mesh/v1alpha1/network.pb.go
其中 MeshNetworks 结构体和定义说明如下:
// MeshNetworks (config map) provides information about the set of networks
// inside a mesh and how to route to endpoints in each network. For example
//
// MeshNetworks(file/config map):
// networks:
// - network1:
// - endpoints:
// - fromRegistry: registry1 #must match secret name in kubernetes
// - fromCidr: 192.168.100.0/22 #a VM network for example
// gateways:
// - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
// port: 15443
// locality: us-east-1a
type MeshNetworks struct {
// REQUIRED: The set of networks inside this mesh. Each network should
// have a unique name and information about how to infer the endpoints in
// the network as well as the gateways associated with the network.
Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
}
经过对于 NewServer 函数的计划分析如下:
func NewServer(args PilotArgs) (*Server, error) {
// 省略错误处理
s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
// 初始化相关配置,并监视配置的变化情况
// ...
// creates the config controller in the pilotConfig.
// 最终会创建一个 crd.NewController 实例
s.initConfigController(&args)
// s.makeKubeConfigController(args)
// s.configController.Run(stop)
// creates and initializes the service controllers
s.initServiceControllers(&args)
// kube.NewController(s.kubeClient, args.Config.ControllerOptions)
// go s.ServiceController.Run(stop)
// 初始化 DiscoveryService gRPC 服务器,后面详细讲解
// 添加 2 个 func 到 startFuncs 中
s.initDiscoveryService(&args)
return s, nil
}
ConfigController
// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
// k8s 方式下初始化
controller, err := s.makeKubeConfigController(args)
s.configController = controller
// Defer starting the controller until after the service is created.
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop)
return nil
})
//...
// Create the config store.
s.istioConfigStore = model.MakeIstioStore(s.configController)
return nil
}
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
kubeCfgFile := s.getKubeCfgFile(args)
configClient, err := crd.NewClient(
kubeCfgFile, "",
model.IstioConfigTypes, // 全部相关的 CRD 定义
args.Config.ControllerOptions.DomainSuffix)
if !args.Config.DisableInstallCRDs {
// 注册自定义的 CRD
configClient.RegisterResources()
}
return crd.NewController(configClient, args.Config.ControllerOptions), nil
}
其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:
istio.io/istio/pilot/pkg/model/config.go
// IstioConfigTypes lists all Istio config types with schemas and validation
IstioConfigTypes = ConfigDescriptor{
VirtualService,
Gateway,
ServiceEntry,
DestinationRule,
EnvoyFilter,
Sidecar,
HTTPAPISpec,
HTTPAPISpecBinding,
QuotaSpec,
QuotaSpecBinding,
AuthenticationPolicy,
AuthenticationMeshPolicy,
ServiceRole,
ServiceRoleBinding,
RbacConfig,
ClusterRbacConfig,
}
其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:
// NewController creates a new Kubernetes controller for CRDs
// Use "" for namespace to listen for all namespace changes
func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache {
log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)
// Queue requires a time duration for a retry delay after a handler error
out := &controller{
client: client,
queue: kube.NewQueue(1 * time.Second),
kinds: make(map[string]cacheHandler),
}
// add stores for CRD kinds
for _, schema := range client.ConfigDescriptor() {
out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod)
}
return out
}
对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;
// controller is a collection of synchronized resource watchers.
// Caches are thread-safe
type controller struct {
client *Client // 每类资源一个连接
queue kube.Queue
kinds map[string]cacheHandler
}
type cacheHandler struct {
informer cache.SharedIndexInformer
handler *kube.ChainHandler
}
// Client is a basic REST client for CRDs implementing config store
type Client struct {
// Map of apiVersion to restClient.
clientset map[string]*restClient
// domainSuffix for the config metadata
domainSuffix string
}
ServiceController
istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go
结构定义如下:
type Controller struct {
domainSuffix string
client kubernetes.Interface
queue Queue
services cacheHandler
endpoints cacheHandler
nodes cacheHandler
pods *PodCache
//....
}
NewController 函数定义如下:
// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see secretcontroler).
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
options.WatchedNamespace, options.ResyncPeriod)
// Queue requires a time duration for a retry delay after a handler error
out := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
queue: NewQueue(1 * time.Second),
ClusterID: options.ClusterID,
XDSUpdater: options.XDSUpdater,
servicesMap: make(map[model.Hostname]*model.Service),
externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
}
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
svcInformer := sharedInformers.Core().V1().Services().Informer()
out.services = out.createCacheHandler(svcInformer, "Services")
epInformer := sharedInformers.Core().V1().Endpoints().Informer()
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
podInformer := sharedInformers.Core().V1().Pods().Informer()
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
return out
}
从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:
- Services
- Endpoints
- Nodes
- Pod
ServiceDiscovery
istio.io/istio/pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService(args *PilotArgs) error {
// 需要注意 env 保存了后面使用的变量包括
// s.istioConfigStore s.ServiceController s.ServiceController
environment := &model.Environment{
Mesh: s.mesh,
MeshNetworks: s.meshNetworks,
IstioConfigStore: s.istioConfigStore, // istio routing rules
ServiceDiscovery: s.ServiceController, // service list
ServiceAccounts: s.ServiceController,
MixerSAN: s.mixerSAN,
}
// Set up discovery service
discovery, err := envoy.NewDiscoveryService(
environment,
args.DiscoveryOptions,
)
s.mux = discovery.RestContainer.ServeMux
// 创建 envoyv2.NewDiscoveryServer 对应的 gRPC Server
s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
environment,
istio_networking.NewConfigGenerator(args.Plugins),
s.ServiceController,
s.configController)
s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)
// ...
// create grpc/http server
s.initGrpcServer(args.KeepaliveOptions)
s.httpServer = &http.Server{
Addr: args.DiscoveryOptions.HTTPAddr,
Handler: s.mux,
}
// create http listener
listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr)
s.HTTPListeningAddr = listener.Addr()
// create grpc listener
grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr)
s.GRPCListeningAddr = grpcListener.Addr()
s.addStartFunc(func(stop <-chan struct{}) error {
// 启动 http server goroutine
// 启动 gRPC server goroutine
// 等待关闭 goroutine
return nil
})
// run secure grpc server
return nil
}
istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go,envoyv2.NewDiscoveryServer
函数定义:
// s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
// environment,
// istio_networking.NewConfigGenerator(args.Plugins),
// s.ServiceController,
// s.configController)
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer {
// 创建 DiscoveryServer 对象
out := &DiscoveryServer{
Env: env,
ConfigGenerator: generator, // generates xDS responses
ConfigController: configCache,
EndpointShardsByService: map[string]*EndpointShards{},
WorkloadsByID: map[string]*Workload{},
edsUpdates: map[string]*EndpointShards{},
concurrentPushLimit: make(chan struct{}, 20),
updateChannel: make(chan *updateReq, 10),
}
env.PushContext = model.NewPushContext()
// 处理相关的更新操作
// handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。
// 它还确保在接收事件和处理事件之间最多经过 maxDelay。
// 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过
// XDS Incremental Push 或者 全量推送出去
go out.handleUpdates()
// 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数
// 1. service 相关的信息有变化的时候,
// 2. jwt public key 发生变化
// 3. Istio CRD 有变化的
// 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数
// 周期性更新
go out.periodicRefresh()
// 周期性更新 metrics
go out.periodicRefreshMetrics()
out.DebugConfigs = pilot.DebugConfigs
pushThrottle := intEnv(pilot.PushThrottle, 10)
pushBurst := intEnv(pilot.PushBurst, 100)
adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst)
out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst)
out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2)
return out
}
ADS 相关定义:
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto
// See https://github.com/lyft/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
// This is a gRPC-only API.
rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest)
returns (stream envoy.api.v2.DiscoveryResponse) {
}
rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest)
returns (stream envoy.api.v2.IncrementalDiscoveryResponse) {
}
}
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto
// A DiscoveryRequest requests a set of versioned resources of the same type for
// a given Envoy node on some API.
message DiscoveryRequest {
// The version_info provided in the request messages will be the version_info
// received with the most recent successfully processed response or empty on
// the first request. It is expected that no new request is sent after a
// response is received until the Envoy instance is ready to ACK/NACK the new
// configuration. ACK/NACK takes place by returning the new API config version
// as applied or the previous API config version respectively. Each type_url
// (see below) has an independent version associated with it.
string version_info = 1;
// The node making the request.
core.Node node = 2;
// List of resources to subscribe to, e.g. list of cluster names or a route
// configuration name. If this is empty, all resources for the API are
// returned. LDS/CDS expect empty resource_names, since this is global
// discovery for the Envoy instance. The LDS and CDS responses will then imply
// a number of resources that need to be fetched via EDS/RDS, which will be
// explicitly enumerated in resource_names.
repeated string resource_names = 3;
// Type of the resource that is being requested, e.g.
// "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit
// in requests made via singleton xDS APIs such as CDS, LDS, etc. but is
// required for ADS.
string type_url = 4;
// nonce corresponding to DiscoveryResponse being ACK/NACKed. See above
// discussion on version_info and the DiscoveryResponse nonce comment. This
// may be empty if no nonce is available, e.g. at startup or for non-stream
// xDS implementations.
string response_nonce = 5;
// This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>`
// failed to update configuration. The *message* field in *error_details* provides the Envoy
// internal exception related to the failure. It is only intended for consumption during manual
// debugging, the string provided is not guaranteed to be stable across Envoy versions.
google.rpc.Status error_detail = 6;
}
message DiscoveryResponse {
// The version of the response data.
string version_info = 1;
// The response resources. These resources are typed and depend on the API being called.
repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];
// [#not-implemented-hide:]
// Canary is used to support two Envoy command line flags:
//
// * --terminate-on-canary-transition-failure. When set, Envoy is able to
// terminate if it detects that configuration is stuck at canary. Consider
// this example sequence of updates:
// - Management server applies a canary config successfully.
// - Management server rolls back to a production config.
// - Envoy rejects the new production config.
// Since there is no sensible way to continue receiving configuration
// updates, Envoy will then terminate and apply production config from a
// clean slate.
// * --dry-run-canary. When set, a canary response will never be applied, only
// validated via a dry run.
bool canary = 3;
// Type URL for resources. This must be consistent with the type_url in the
// Any messages for resources if resources is non-empty. This effectively
// identifies the xDS API when muxing over ADS.
string type_url = 4;
// For gRPC based subscriptions, the nonce provides a way to explicitly ack a
// specific DiscoveryResponse in a following DiscoveryRequest. Additional
// messages may have been sent by Envoy to the management server for the
// previous version on the stream prior to this DiscoveryResponse, that were
// unprocessed at response send time. The nonce allows the management server
// to ignore any further DiscoveryRequests for the previous version until a
// DiscoveryRequest bearing the nonce. The nonce is optional and is not
// required for non-stream based xDS implementations.
string nonce = 5;
// [#not-implemented-hide:]
// The control plane instance that sent the response.
core.ControlPlane control_plane = 6;
}
关于 ADS 保证一致性的内容参见:envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:
一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中
* 必须始终先推送 CDS 更新(如果有)。
* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。
* LDS 更新必须在相应的 CDS/EDS 更新后到达。
* 与新添加的监听器相关的 RDS 更新必须在最后到达。
* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。
pilot/pkg/proxy/envoy/v2/ads.go
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// ...
// InitContext returns immediately if the context was already initialized.
// InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
// 1. initServiceRegistry
// 2. initVirtualServices
// 3. initDestinationRules
// 4. initAuthorizationPolicies
// 5. InitSidecarScopes
err := s.globalPushContext().InitContext(s.Env)
con := newXdsConnection(peerAddr, stream)
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)
for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
err = s.initConnectionNode(discReq, con)
switch discReq.TypeUrl {
case ClusterType:
// ...
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
case ListenerType:
// ...
adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
con.LDSWatch = true
err := s.pushLds(con, s.globalPushContext(), true, versionInfo())
case RouteType:
// ...
adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes))
err := s.pushRoute(con, s.globalPushContext())
case EndpointType:
// ...
// 各种错误处理
for _, cn := range con.Clusters {
s.removeEdsCon(cn, con.ConID, con)
}
for _, cn := range clusters {
s.addEdsCon(cn, con.ConID, con)
}
con.Clusters = clusters
adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
err := s.pushEds(s.globalPushContext(), con, true, nil)
if err != nil {
return err
}
default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
// ...
case pushEv := <-con.pushChannel:
// ...
err := s.pushConnection(con, pushEv)
if err != nil {
return nil
}
}
}
}
istio.io/istio/pilot/pkg/model/push_context.go
// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {
// privateServices are reachable within the same namespace.
privateServicesByNamespace map[string][]*Service
// publicServices are services reachable within the mesh.
publicServices []*Service
privateVirtualServicesByNamespace map[string][]Config
publicVirtualServices []Config
// destination rules are of three types:
// namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace
// namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace
// allExportedDestRules: all (public) dest rules across all namespaces
// We need the allExportedDestRules in addition to namespaceExportedDestRules because we select
// the dest rule based on the most specific host match, and not just any destination rule
namespaceLocalDestRules map[string]*processedDestRules
namespaceExportedDestRules map[string]*processedDestRules
allExportedDestRules *processedDestRules
// sidecars for each namespace
sidecarsByNamespace map[string][]*SidecarScope
////////// END ////////
// The following data is either a global index or used in the inbound path.
// Namespace specific views do not apply here.
// ServiceByHostname has all services, indexed by hostname.
ServiceByHostname map[Hostname]*Service `json:"-"`
// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
// are no authorization policies in the cluster.
AuthzPolicies *AuthorizationPolicies `json:"-"`
// ServicePort2Name is used to keep track of service name and port mapping.
// This is needed because ADS names use port numbers, while endpoints use
// port names. The key is the service name. If a service or port are not found,
// the endpoint needs to be re-evaluated later (eventual consistency)
ServicePort2Name map[string]PortList `json:"-"`
initDone bool
}
// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment) error {
ps.Mutex.Lock()
defer ps.Mutex.Unlock()
if ps.initDone {
return nil
}
ps.Env = env
var err error
// Caches list of services in the registry, and creates a map
// of hostname to service -> ServicePort2Name map[string]PortList `json:"-"`
ps.initServiceRegistry(env)
// Caches list of virtual services -> publicVirtualServices []Config
ps.initVirtualServices(env)
// Split out of DestinationRule expensive conversions - once per push.
// 最后保存到以下三个变量中:
// * namespaceLocalDestRules map[string]*processedDestRules
// * namespaceExportedDestRules map[string]*processedDestRules
// * allExportedDestRules *processedDestRules
ps.initDestinationRules(env)
// Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies
ps.initAuthorizationPolicies(env)
// Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope
ps.InitSidecarScopes(env)
ps.initDone = true
return nil
}
CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// ...
// InitContext returns immediately if the context was already initialized.
// InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
// 1. initServiceRegistry
// 2. initVirtualServices
// 3. initDestinationRules
// 4. initAuthorizationPolicies
// 5. InitSidecarScopes
err := s.globalPushContext().InitContext(s.Env)
con := newXdsConnection(peerAddr, stream)
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)
for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
// 主要是调用 `ParseServiceNodeWithMetadata` 函数,
// 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象
// 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain
err = s.initConnectionNode(discReq, con)
switch discReq.TypeUrl {
case ClusterType:
// 如果已经发送过 CDS 数据后的响应消息的处理
if con.CDSWatch {
// ...
}
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
//...
}
core.Node
Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{
“id”: “…”,
“cluster”: “…”,
“metadata”: “{…}”,
“locality”: “{…}”,
“build_version”: “…”
}id
(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via –service-node.cluster
(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via –service-cluster.metadata
(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.locality
(core.Locality) Locality specifying where the Envoy instance is running.build_version
(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.
在 initConnectionNode
函数中,主要是调用 ParseServiceNodeWithMetadata
函数,从 Req 的消息中获取到各种信息,并生产 Proxy
对象;
istio.io/istio/pilot/pkg/model/context.go
func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) {
}
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,
// etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from
// 'node' info in the protocol as well as data extracted from registries.
//
// In current Istio implementation nodes use a 4-parts '~' delimited ID.
// Type~IPAddress~ID~Domain
type Proxy struct {
// ClusterID specifies the cluster where the proxy resides.
// TODO: clarify if this is needed in the new 'network' model, likely needs to
// be renamed to 'network'
ClusterID string
// Type specifies the node type. First part of the ID.
Type NodeType
// IPAddresses is the IP addresses of the proxy used to identify it and its
// co-located service instances. Example: "10.60.1.6". In some cases, the host
// where the poxy and service instances reside may have more than one IP address
IPAddresses []string
// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
// namespace.
ID string
// Locality is the location of where Envoy proxy runs.
Locality Locality
// DNSDomain defines the DNS domain suffix for short hostnames (e.g.
// "default.svc.cluster.local")
DNSDomain string
// TrustDomain defines the trust domain of the certificate
TrustDomain string
// ConfigNamespace defines the namespace where this proxy resides
// for the purposes of network scoping.
// NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
ConfigNamespace string
// Metadata key-value pairs extending the Node identifier
Metadata map[string]string
// the sidecarScope associated with the proxy
SidecarScope *SidecarScope
}
推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())
istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
// 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster
rawClusters, err := s.generateRawClusters(con.modelNode, push)
// DebugConfigs controls saving snapshots of configs for /debug/adsz.
// Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1
// 如果通过 env 开启了此选项,则可以使用 9093 端口 /debug/adsz 查看详细信息,会增加内存开销
if s.DebugConfigs {
con.CDSClusters = rawClusters
}
response := con.clusters(rawClusters)
err = con.send(response)
return nil
}
因此 s.generateRawClusters(con.modelNode, push)
的作用不言而喻,就是将 push
上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) {
rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push)
// 对 rawClusters 中的信息进行 Validate 验证
return rawClusters, nil
}
对象 rawClusters
是通过 s.ConfigGenerator.BuildClusters
函数基于 s.Env, node, push
三者的信息组合而生成出来的:
- s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息
-
node 为本次 Req 请求中生成的包含 Node 相关信息的对象
-
push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
BuildClusters 函数的实现如下:
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution
// For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
clusters := make([]*apiv2.Cluster, 0)
switch proxy.Type {
case model.SidecarProxy:
// GetProxyServiceInstances returns service instances co-located with the proxy
// 获取与 proxy 所在主机上的 service instance,包括 headless 服务
instances, err := env.GetProxyServiceInstances(proxy)
sidecarScope := proxy.SidecarScope
recomputeOutboundClusters := true
// 追加 OutboundClusters
if recomputeOutboundClusters {
clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
}
// Let ServiceDiscovery decide which IP and Port are used for management if
// there are multiple IPs
managementPorts := make([]*model.Port, 0)
for _, ip := range proxy.IPAddresses {
managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
}
// 追加与 proxy ip 上 managementPorts 相关的 InboundClusters
clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)
default: // Gateways
// ...
}
// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
// DO NOT CALL PLUGINS for these two clusters.
clusters = append(clusters, buildBlackHoleCluster())
clusters = append(clusters, buildDefaultPassthroughCluster())
// resolves cluster name conflicts.
return normalizeClusters(push, proxy, clusters), nil
}
临时的调试方法,挂载到了 9093 端口,但是是否保存 ads 相关的信息,还会受到 pilot 相关选项的限制,参见
istio.io/istio/pkg/features/pilot/pilot.go
// DebugConfigs controls saving snapshots of configs for /debug/adsz. // Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1 // For larger clusters it can increase memory use and GC - useful for small tests. DebugConfigs = os.Getenv("PILOT_DEBUG_ADSZ_CONFIG") == "1"
istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go
// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) {
// For debugging and load testing v2 we add an memory registry.
s.MemRegistry = NewMemServiceDiscovery(
map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService,
}, 2)
s.MemRegistry.EDSUpdater = s
s.MemRegistry.ClusterID = "v2-debug"
sctl.AddRegistry(aggregate.Registry{
ClusterID: "v2-debug",
Name: serviceregistry.ServiceRegistry("memAdapter"),
ServiceDiscovery: s.MemRegistry,
ServiceAccounts: s.MemRegistry,
Controller: s.MemRegistry.controller,
})
mux.HandleFunc("/ready", s.ready)
mux.HandleFunc("/debug/edsz", s.edsz)
mux.HandleFunc("/debug/adsz", s.adsz)
mux.HandleFunc("/debug/cdsz", cdsz)
mux.HandleFunc("/debug/syncz", Syncz)
mux.HandleFunc("/debug/registryz", s.registryz)
mux.HandleFunc("/debug/endpointz", s.endpointz)
mux.HandleFunc("/debug/endpointShardz", s.endpointShardz)
mux.HandleFunc("/debug/workloadz", s.workloadz)
mux.HandleFunc("/debug/configz", s.configz)
mux.HandleFunc("/debug/authenticationz", s.authenticationz)
mux.HandleFunc("/debug/config_dump", s.ConfigDump)
mux.HandleFunc("/debug/push_status", s.PushStatusHandler)
}
PushContext 初始化
istio.io/istio/pilot/pkg/model/push_context.go
func (ps *PushContext) InitContext(env *Environment) error {
// 只会初始化一次,如果已经初始化了则直接返回
if ps.initDone {
return nil
}
ps.initServiceRegistry(env)
ps.initVirtualServices(env)
ps.initDestinationRules(env)
ps.initAuthorizationPolicies(env)
ps.InitSidecarScopes(env)
}
其中 env 保存了能够使用的全部信息,包括 ServiceDiscovery 接口。
istio.io/istio/pilot/pkg/model/context.go
// Environment provides an aggregate environmental API for Pilot
type Environment struct {
// Discovery interface for listing services and instances.
ServiceDiscovery
ServiceAccounts
IstioConfigStore
Mesh *meshconfig.MeshConfig
MixerSAN []string
PushContext *PushContext
MeshNetworks *meshconfig.MeshNetworks
}
- initServiceRegistry
- 从 env.Services() -> ServiceDiscovery::Services
- 涉及操作的变量包括:
privateServicesByNamespace
ns 为 keypublicServices
列表ServiceByHostname
s.Hostname -> ServiceServicePort2Name
s.Hostname -> Ports
- initVirtualServices
- 从 env.List(VirtualService.Type, NamespaceAll) -> IstioConfigStore::ConfigStore::List
-
将 virtual services 的 host shortnames 转换成 FQDNs
-
涉及操作的变量包括:
-
privateVirtualServicesByNamespace
ns 为 key -
publicVirtualServices
列表 -
相关定义
VirtualService -> (Hosts, []*HTTPRoute) -> (HTTPMatchRequest, HTTPRouteDestination)
apiVersion: networking.istio.io/v1alpha3 kind: VirtualService metadata: name: reviews-route spec: hosts: - reviews.prod.svc.cluster.local http: # HTTPRoute - match: - uri: prefix: "/wpcatalog" - uri: prefix: "/consumercatalog" rewrite: uri: "/newcatalog" route: - destination: # HTTPRouteDestination host: reviews.prod.svc.cluster.local subset: v2 - route: - destination: host: reviews.prod.svc.cluster.local subset: v1 apiVersion: networking.istio.io/v1alpha3 kind: DestinationRule metadata: name: reviews-destination spec: host: reviews.prod.svc.cluster.local subsets: - name: v1 labels: version: v1 - name: v2 labels: version: v2 - initDestinationRules
- env.List(DestinationRule.Type, NamespaceAll) -> IstioConfigStore::ConfigStore::List
- 操作变量涉及
namespaceLocalDestRules
ns -> processedDestRules, ConfigScope == PRIVATEnamespaceExportedDestRules
ns -> processedDestRules, ConfigScope == PUBLICallExportedDestRules
列表
- initAuthorizationPolicies
- env.IstioConfigStore.ClusterRbacConfig()
- 操作变量
AuthzPolicies
- InitSidecarScopes
- env.List(Sidecar.Type, NamespaceAll) -> env.List(Sidecar.Type, NamespaceAll)
- 操作的变量
sidecarsByNamespace
ns -> SidecarScope
CDS 初始化流程详解
DiscoveryServer::pushCds
-> DiscoveryServer::generateRawClusters
-> ConfigGenerator.BuildClusters
,最终的函数主体在函数 BuildClusters
中实现
BuildClusters
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution: For inbound (sidecar only):
// Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
clusters := make([]*apiv2.Cluster, 0)
switch proxy.Type {
case model.SidecarProxy:
// 1. 获取 Proxy 所在 Pod 上的监听的服务名,在 k8s 中是通过 proxyIP 进行相关的 Pod 查找
// 主要用于生成 Inbound 相关的集群
// GetProxyServiceInstances returns the service instances that
// co-located(in the same network namespace and security context)
// with a given Proxy
instances, err := env.GetProxyServiceInstances(proxy)
sidecarScope := proxy.SidecarScope
recomputeOutboundClusters := true
if configgen.CanUsePrecomputedCDS(proxy) {
// 如果已经缓存过了,则直接使用
}
}
// 2. 如果没有缓存,则直接计算 configgen.buildOutboundClusters(env, proxy, push)
if recomputeOutboundClusters {
clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
}
// Let ServiceDiscovery decide which IP and Port are used for management if
// there are multiple IPs
// managementPorts 主要是从 Pod 中获取 Liveness && Readiness probes 的端口
managementPorts := make([]*model.Port, 0)
for _, ip := range proxy.IPAddresses {
managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
}
//
clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)
// ...
// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
// DO NOT CALL PLUGINS for these two clusters.
// 添加 blackhole 和 passthrough cluster
clusters = append(clusters, buildBlackHoleCluster())
clusters = append(clusters, buildDefaultPassthroughCluster())
}
以多版本的 HelloWorld 为例,最终生成的样例如下,完整样例参见 istio_helloworld_v1_v2.json
"clusters": {
"dynamic_active_clusters": [
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
"name": "BlackHoleCluster",
"connect_timeout": "1s"
},
"last_updated": "2019-02-13T09:28:37.971Z"
},
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
"name": "BlackHoleCluster",
"connect_timeout": "1s"
},
"last_updated": "2019-02-13T09:28:37.971Z"
},
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
// fmt.Sprintf("%s|%d|%s|%s", direction, port, subsetName, hostname)
"name": "inbound|5000||helloworld.default.svc.cluster.local",
"connect_timeout": "1s",
"hosts": [
{
"socket_address": {
"address": "127.0.0.1",
"port_value": 5000
}
}
],
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T09:28:37.971Z"
},
{
"version_info": "2019-02-13T12:41:46Z/10",
"cluster": {
"name": "outbound|5000|v1|helloworld.default.svc.cluster.local",
"type": "EDS",
"eds_cluster_config": {
"eds_config": {
"ads": {} // ESS 类型的,需要通过 EDS 来进行获取 !!!
},
"service_name": "outbound|5000|v1|helloworld.default.svc.cluster.local"
},
"connect_timeout": "1s",
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T12:41:46.640Z"
},
{
"version_info": "2019-02-13T12:41:46Z/10",
"cluster": {
"name": "outbound|5000|v2|helloworld.default.svc.cluster.local",
"type": "EDS",
"eds_cluster_config": {
"eds_config": {
"ads": {}
},
"service_name": "outbound|5000|v2|helloworld.default.svc.cluster.local"
},
"connect_timeout": "1s",
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T12:41:46.641Z"
},
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
"name": "outbound|5000||helloworld.default.svc.cluster.local",
"type": "EDS",
"eds_cluster_config": {
"eds_config": {
"ads": {}
},
"service_name": "outbound|5000||helloworld.default.svc.cluster.local"
},
"connect_timeout": "1s",
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T09:28:37.928Z"
},
]
},
}
GetProxyServiceInstances
GetProxyServiceInstances 函数在 kube 中的实现通过 ProxyIP 所在的 Pod 查找对应的服务,代码如下:
istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go
// GetProxyServiceInstances returns service instances co-located with a given proxy
func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.ServiceInstance, error) {
out := make([]*model.ServiceInstance, 0)
// There is only one IP for kube registry
proxyIP := proxy.IPAddresses[0]
proxyNamespace := ""
pod := c.pods.getPodByIP(proxyIP)
if pod != nil {
proxyNamespace = pod.Namespace
// 1. find proxy service by label selector,
// if not any, there may exist headless service
// failover to 2
svcLister := listerv1.NewServiceLister(c.services.informer.GetIndexer())
if services, err := svcLister.GetPodServices(pod); err != nil && len(services) > 0 {
for _, svc := range services {
item, exists, err := c.endpoints.informer.GetStore().GetByKey(KeyFunc(svc.Namespace, svc.Name))
ep := *item.(*v1.Endpoints)
out = append(out, c.getProxyServiceInstancesByEndpoint(ep, proxy)...)
}
return out, nil
}
}
// 2. Headless service
endpointsForPodInSameNS := make([]*model.ServiceInstance, 0)
endpointsForPodInDifferentNS := make([]*model.ServiceInstance, 0)
for _, item := range c.endpoints.informer.GetStore().List() {
ep := *item.(*v1.Endpoints)
endpoints := &endpointsForPodInSameNS
if ep.Namespace != proxyNamespace {
endpoints = &endpointsForPodInDifferentNS
}
*endpoints = append(*endpoints, c.getProxyServiceInstancesByEndpoint(ep, proxy)...)
}
// Put the endpointsForPodInSameNS in front of endpointsForPodInDifferentNS so that Pilot will
// first use endpoints from endpointsForPodInSameNS. This makes sure if there are two endpoints
// referring to the same IP/port, the one in endpointsForPodInSameNS will be used. (The other one
// in endpointsForPodInDifferentNS will thus be rejected by Pilot).
out = append(endpointsForPodInSameNS, endpointsForPodInDifferentNS...)
if len(out) == 0 {
if c.Env != nil {
c.Env.PushContext.Add(model.ProxyStatusNoService, proxy.ID, proxy, "")
status := c.Env.PushContext
} else {}
}
return out, nil
}
buildOutboundClusters
查找出 proxy 可见的 Service 和 公开的 Service,设置成对应的 Cluster。
func (configgen *ConfigGeneratorImpl) buildOutboundClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) []*apiv2.Cluster {
clusters := make([]*apiv2.Cluster, 0)
inputParams := &plugin.InputParams{
Env: env,
Push: push,
Node: proxy,
}
networkView := model.GetNetworkView(proxy)
// push.Services(proxy) 会返回对于Proxy 可见的 PrivateService 和 PublicServices
for _, service := range push.Services(proxy) {
config := push.DestinationRule(proxy, service)
for _, port := range service.Ports {
if port.Protocol == model.ProtocolUDP {
continue
}
inputParams.Service = service
inputParams.Port = port
lbEndpoints := buildLocalityLbEndpoints(env, networkView, service, port.Port, nil)
// create default cluster
// outbound|5000||helloworld.default.svc.cluster.local
discoveryType := convertResolution(service.Resolution)
clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
serviceAccounts := env.ServiceAccounts.GetIstioServiceAccounts(service.Hostname, []int{port.Port})
defaultCluster := buildDefaultCluster(env, clusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound)
updateEds(defaultCluster)
setUpstreamProtocol(defaultCluster, port)
clusters = append(clusters, defaultCluster)
// 如果 service 存在对应的 destinationRule 存在,则需要将 destinationRule
// 中定义的 subset 也添加到 outbound 类型的 cluster 集合中
// outbound|5000|v1|helloworld.default.svc.cluster.local
// outbound|5000|v2|helloworld.default.svc.cluster.local
// v1 和 v2 为 subset name
if config != nil {
destinationRule := config.Spec.(*networking.DestinationRule)
defaultSni := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
applyTrafficPolicy(env, defaultCluster, destinationRule.TrafficPolicy, port, serviceAccounts,
defaultSni, DefaultClusterMode, model.TrafficDirectionOutbound)
setLocalityPriority := false
if defaultCluster.OutlierDetection != nil {
setLocalityPriority = true
}
applyLocalityLBSetting(proxy, defaultCluster.LoadAssignment, env.Mesh.LocalityLbSetting, setLocalityPriority)
for _, subset := range destinationRule.Subsets {
inputParams.Subset = subset.Name
subsetClusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port)
defaultSni := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port)
// clusters with discovery type STATIC, STRICT_DNS or LOGICAL_DNS rely on cluster.hosts field
// ServiceEntry's need to filter hosts based on subset.labels in order to perform weighted routing
if discoveryType != apiv2.Cluster_EDS && len(subset.Labels) != 0 {
lbEndpoints = buildLocalityLbEndpoints(env, networkView, service, port.Port, []model.Labels{subset.Labels})
}
subsetCluster := buildDefaultCluster(env, subsetClusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound)
updateEds(subsetCluster)
setUpstreamProtocol(subsetCluster, port)
applyTrafficPolicy(env, subsetCluster, destinationRule.TrafficPolicy, port, serviceAccounts, defaultSni,
DefaultClusterMode, model.TrafficDirectionOutbound)
applyTrafficPolicy(env, subsetCluster, subset.TrafficPolicy, port, serviceAccounts, defaultSni,
DefaultClusterMode, model.TrafficDirectionOutbound)
setLocalityPriority = false
if subsetCluster.OutlierDetection != nil {
setLocalityPriority = true
}
applyLocalityLBSetting(proxy, subsetCluster.LoadAssignment, env.Mesh.LocalityLbSetting, setLocalityPriority)
// call plugins
for _, p := range configgen.Plugins {
p.OnOutboundCluster(inputParams, subsetCluster)
}
clusters = append(clusters, subsetCluster)
}
}
// call plugins for the default cluster
for _, p := range configgen.Plugins {
p.OnOutboundCluster(inputParams, defaultCluster)
}
}
}
return clusters
}
buildInboundClusters
在 buildInboundClusters
中需要添加 k8s 用于管理端口相关的信息,ManagementPorts
函数则是通过 proxyIP 查找到与 ProxyIP 相同的 Pod,然后基于 Pod 的规范获取到 Liveness 和 Readiness probes 中定义的相关端口;
// ManagementPorts implements a service catalog operation
// addr 为 proxy 的 IP 地址
func (c *Controller) ManagementPorts(addr string) model.PortList {
pod := c.pods.getPodByIP(addr)
if pod == nil {
return nil
}
managementPorts, err := convertProbesToPorts(&pod.Spec)
return managementPorts
}
// convertProbesToPorts
// convertProbesToPorts returns a PortList consisting of the ports where the
// pod is configured to do Liveness and Readiness probes
func convertProbesToPorts(t *v1.PodSpec) (model.PortList, error) {
set := make(map[string]*model.Port)
for _, container := range t.Containers {
for _, probe := range []*v1.Probe{container.LivenessProbe, container.ReadinessProbe} {
p, err := convertProbePort(&container, &probe.Handler)
if err != nil {
errs = multierror.Append(errs, err)
} else if p != nil && set[p.Name] == nil {
// Deduplicate along the way. We don't differentiate between HTTP vs TCP mgmt ports
set[p.Name] = p
}
}
}
mgmtPorts := make(model.PortList, 0, len(set))
for _, p := range set {
mgmtPorts = append(mgmtPorts, p)
}
sort.Slice(mgmtPorts, func(i, j int) bool { return mgmtPorts[i].Port < mgmtPorts[j].Port })
return mgmtPorts, errs
}
buildInboundClusters 函数的主体实现如下:
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
func (configgen *ConfigGeneratorImpl) buildInboundClusters(env *model.Environment, proxy *model.Proxy,
push *model.PushContext, instances []*model.ServiceInstance, managementPorts []*model.Port) []*apiv2.Cluster {
clusters := make([]*apiv2.Cluster, 0)
sidecarScope := proxy.SidecarScope
noneMode := proxy.GetInterceptionMode() == model.InterceptionNone
// 如果没有设置 sidecarScope 和 ingressListener
if sidecarScope == nil || !sidecarScope.HasCustomIngressListeners {
// ...
// 为传入的 instances 建立 inbound 相关的集群
for _, instance := range instances {
pluginParams := &plugin.InputParams{
Env: env,
Node: proxy,
ServiceInstance: instance,
Port: instance.Endpoint.ServicePort,
Push: push,
Bind: LocalhostAddress, // 设定 Hosts 为 127.0.0.1
}
localCluster := configgen.buildInboundClusterForPortOrUDS(pluginParams)
clusters = append(clusters, localCluster)
}
// 添加管理端口, clusterName 格式为 inbound|port_name|mgmCluster|port
// Add a passthrough cluster for traffic to management ports (health check ports)
for _, port := range managementPorts {
clusterName := model.BuildSubsetKey(model.TrafficDirectionInbound, port.Name,
ManagementClusterHostname, port.Port)
localityLbEndpoints := buildInboundLocalityLbEndpoints(LocalhostAddress, port.Port)
mgmtCluster := buildDefaultCluster(env, clusterName, apiv2.Cluster_STATIC, localityLbEndpoints,
model.TrafficDirectionInbound)
setUpstreamProtocol(mgmtCluster, port)
clusters = append(clusters, mgmtCluster)
}
} else {
// ....
}
return clusters
}
接着调用将各种信息拼装成 Plugin
结构,传入 buildInboundClusterForPortOrUDS
来进行最终的拼装,
// Plugin is called during the construction of a xdsapi.Listener which may alter the Listener in any
// way. Examples include AuthenticationPlugin that sets up mTLS authentication on the inbound Listener
// and outbound Cluster, the mixer plugin that sets up policy checks on the inbound listener, etc.
type Plugin interface {
// OnOutboundListener is called whenever a new outbound listener is added to the LDS output for a given service.
// Can be used to add additional filters on the outbound path.
OnOutboundListener(in *InputParams, mutable *MutableObjects) error
// OnInboundListener is called whenever a new listener is added to the LDS output for a given service
// Can be used to add additional filters.
OnInboundListener(in *InputParams, mutable *MutableObjects) error
// OnOutboundCluster is called whenever a new cluster is added to the CDS output.
// This is called once per push cycle, and not for every sidecar/gateway, except for gateways with non-standard
// operating modes.
OnOutboundCluster(in *InputParams, cluster *xdsapi.Cluster)
// OnInboundCluster is called whenever a new cluster is added to the CDS output.
// Called for each sidecar
OnInboundCluster(in *InputParams, cluster *xdsapi.Cluster)
// OnOutboundRouteConfiguration is called whenever a new set of virtual hosts (a set of virtual hosts with routes) is
// added to RDS in the outbound path.
OnOutboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)
// OnInboundRouteConfiguration is called whenever a new set of virtual hosts are added to the inbound path.
OnInboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)
// OnInboundFilterChains is called whenever a plugin needs to setup the filter chains, including relevant filter chain
// configuration, like FilterChainMatch and TLSContext.
OnInboundFilterChains(in *InputParams) []FilterChain
}
函数buildInboundClusterForPortOrUDS
完成了最后拼装 cluster 对象的接力赛:
/*
for _, instance := range instances {
pluginParams := &plugin.InputParams{
Env: env,
Node: proxy,
ServiceInstance: instance,
Port: instance.Endpoint.ServicePort,
Push: push,
Bind: LocalhostAddress,
}
localCluster := configgen.buildInboundClusterForPortOrUDS(pluginParams)
clusters = append(clusters, localCluster)
}
*/
func (configgen *ConfigGeneratorImpl) buildInboundClusterForPortOrUDS(pluginParams *plugin.InputParams) *apiv2.Cluster {
instance := pluginParams.ServiceInstance
clusterName := model.BuildSubsetKey(model.TrafficDirectionInbound, instance.Endpoint.ServicePort.Name,
instance.Service.Hostname, instance.Endpoint.ServicePort.Port)
// 设置本地绑定的地址信息
localityLbEndpoints := buildInboundLocalityLbEndpoints(pluginParams.Bind, instance.Endpoint.Port)
localCluster := buildDefaultCluster(pluginParams.Env, clusterName, apiv2.Cluster_STATIC, localityLbEndpoints,
model.TrafficDirectionInbound)
setUpstreamProtocol(localCluster, instance.Endpoint.ServicePort)
// call plugins,用于通知到相关的处理函数调用
for _, p := range configgen.Plugins {
p.OnInboundCluster(pluginParams, localCluster)
}
// When users specify circuit breakers, they need to be set on the receiver end
// (server side) as well as client side, so that the server has enough capacity
// (not the defaults) to handle the increased traffic volume
// DestinationRule returns a destination rule for a service name in a given domain.
config := pluginParams.Push.DestinationRule(pluginParams.Node, instance.Service)
if config != nil {
destinationRule := config.Spec.(*networking.DestinationRule)
if destinationRule.TrafficPolicy != nil {
// only connection pool settings make sense on the inbound path.
// upstream TLS settings/outlier detection/load balancer don't apply here.
applyConnectionPool(pluginParams.Env, localCluster, destinationRule.TrafficPolicy.ConnectionPool,
model.TrafficDirectionInbound)
}
}
return localCluster
}
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
applyConnectionPool
会根据 service 对应的 destinationRule
信息完成 ConnectionPool
部分的补齐, 函数定义如下:
// FIXME: there isn't a way to distinguish between unset values and zero values
func applyConnectionPool(env *model.Environment, cluster *apiv2.Cluster, settings *networking.ConnectionPoolSettings, direction model.TrafficDirection) {
threshold := GetDefaultCircuitBreakerThresholds(direction)
if settings.Http != nil {
if settings.Http.Http2MaxRequests > 0 {
// Envoy only applies MaxRequests in HTTP/2 clusters
threshold.MaxRequests = &types.UInt32Value{Value: uint32(settings.Http.Http2MaxRequests)}
}
if settings.Http.Http1MaxPendingRequests > 0 {
// Envoy only applies MaxPendingRequests in HTTP/1.1 clusters
threshold.MaxPendingRequests = &types.UInt32Value{Value: uint32(settings.Http.Http1MaxPendingRequests)}
}
if settings.Http.MaxRequestsPerConnection > 0 {
cluster.MaxRequestsPerConnection = &types.UInt32Value{Value: uint32(settings.Http.MaxRequestsPerConnection)}
}
// FIXME: zero is a valid value if explicitly set, otherwise we want to use the default
if settings.Http.MaxRetries > 0 {
threshold.MaxRetries = &types.UInt32Value{Value: uint32(settings.Http.MaxRetries)}
}
}
if settings.Tcp != nil {
if settings.Tcp.ConnectTimeout != nil {
cluster.ConnectTimeout = util.GogoDurationToDuration(settings.Tcp.ConnectTimeout)
}
if settings.Tcp.MaxConnections > 0 {
threshold.MaxConnections = &types.UInt32Value{Value: uint32(settings.Tcp.MaxConnections)}
}
applyTCPKeepalive(env, cluster, settings)
}
cluster.CircuitBreakers = &v2Cluster.CircuitBreakers{
Thresholds: []*v2Cluster.CircuitBreakers_Thresholds{threshold},
}
}
LDS 初始化流程详解
DiscoveryServer::pushLds
-> DiscoveryServer::generateRawListeners
-> ConfigGenerator.BuildListeners
的流程进行,主要的实现在函数 BuildListeners
中。
仍然以 HelloWorld 为例,只保留了2个相关的样例:envoy_listeners.json
相关信息
# kubectl get pod -n istio-system -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE
helloworld-v1-8f8dd85-d59lh 2/2 Running 0 155m 10.128.5.4 node06 <none>
helloworld-v2-f9cf47df4-w9mfn 2/2 Running 0 155m 10.128.13.2 node04 <none>
# kubectl get svc -n istio-system -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
helloworld ClusterIP 10.0.40.71 <none> 5000/TCP 155m app=helloworld
# kubectl -n istio-system exec -ti helloworld-v1-8f8dd85-d59lh -c istio-proxy -- netstat -t -anlp
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:5000 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:15090 0.0.0.0:* LISTEN 23/envoy
tcp 0 0 127.0.0.1:15000 0.0.0.0:* LISTEN 23/envoy
tcp 0 0 0.0.0.0:15001 0.0.0.0:* LISTEN 23/envoy
端口号 | 服务名 | 说明 |
---|---|---|
5000 | helloworld | endpoint 监听端口 |
15090 | 静态监听 | 通过 /stats/prometheus,转发到 15000 管理端口 |
15000 | 管理端口 | 监听在本机 127.0.0.1 |
15001 | virtual 端口 | BlackHoleCluster,用于接受 iptable 的重定向 |
envoy 打印的日志,可以查看到对应的加载的信息:
[lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[lds_api.cc:80] lds: add/update listener 'virtual'
会从相关的 namespace 中查找出对应暴露出去的对应端口,生成相关记录,并针对本地 endpoint 端口增加一条记录,有去重的功能。
"listeners": {
"dynamic_active_listeners": [
// 生成一个 endpoint_ip:5000 端口的 hellworld 的记录
{
"version_info": "2019-02-13T09:28:16Z/7",
"listener": {
"name": "10.128.5.4_5000",
"address": {
"socket_address": {
"address": "10.128.5.4",
"port_value": 5000
}
},
"filter_chains": [
],
"deprecated_v1": {
"bind_to_port": false
},
"listener_filters": [
{
"name": "envoy.listener.tls_inspector",
"config": {}
}
]
},
"last_updated": "2019-02-13T09:28:38.027Z"
},
// 生成一个 0.0.0.0:5000 端口的 hellworld 的记录
{
"version_info": "2019-02-15T05:44:18Z/6",
"listener": {
"name": "0.0.0.0_5000",
"address": {
"socket_address": {
"address": "0.0.0.0",
"port_value": 5000
}
},
"filter_chains": [
],
"deprecated_v1": {
"bind_to_port": false
}
},
"last_updated": "2019-02-15T05:44:18.648Z"
},
// 为可见的其他 service 暴露出来的端口生成对应的记录
// BlackHoleCluster
{
"version_info": "2019-02-13T09:28:16Z/7",
"listener": {
"name": "virtual",
"address": {
"socket_address": {
"address": "0.0.0.0",
"port_value": 15001
}
},
"filter_chains": [
{
"filters": [
{
"name": "envoy.tcp_proxy",
"config": {
"stat_prefix": "BlackHoleCluster",
"cluster": "BlackHoleCluster"
}
}
]
}
],
"use_original_dst": true
},
"last_updated": "2019-02-13T09:28:38.080Z"
},
]
}
BuildListeners
函数实现如下:
// BuildListeners produces a list of listeners and referenced clusters for all proxies
func (configgen *ConfigGeneratorImpl) BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*xdsapi.Listener, error) {
switch node.Type {
case model.SidecarProxy:
// 获取本机上监听服务的 endpoint,然后根据 endpoint.ip 和 port 生成对应的记录
return configgen.buildSidecarListeners(env, node, push)
case model.Router, model.Ingress:
return configgen.buildGatewayListeners(env, node, push)
}
return nil, nil
}
buildSidecarListeners
函数主要流程如下:
// buildSidecarListeners produces a list of listeners for sidecar proxies
func (configgen *ConfigGeneratorImpl) buildSidecarListeners(env *model.Environment, node *model.Proxy,
push *model.PushContext) ([]*xdsapi.Listener, error) {
mesh := env.Mesh
proxyInstances, err := env.GetProxyServiceInstances(node)
noneMode := node.GetInterceptionMode() == model.InterceptionNone
listeners := make([]*xdsapi.Listener, 0)
if mesh.ProxyListenPort > 0 {
// 建立 inbound 相关的 Listeners
inbound := configgen.buildSidecarInboundListeners(env, node, push, proxyInstances)
// outbound 相关的 Listeners
// buildSidecarOutboundListeners generates http and tcp listeners for
// outbound connections from the proxy based on the sidecar scope associated with the proxy.
// TODO(github.com/istio/pilot/issues/237)
//
// Sharing tcp_proxy and http_connection_manager filters on the same port for
// different destination services doesn't work with Envoy (yet). When the
// tcp_proxy filter's route matching fails for the http service the connection
// is closed without falling back to the http_connection_manager.
//
// Temporary workaround is to add a listener for each service IP that requires
// TCP routing
//
// Connections to the ports of non-load balanced services are directed to
// the connection's original destination. This avoids costly queries of instance
// IPs and ports, but requires that ports of non-load balanced service be unique.
outbound := configgen.buildSidecarOutboundListeners(env, node, push, proxyInstances)
listeners = append(listeners, inbound...)
listeners = append(listeners, outbound...)
// Do not generate any management port listeners if the user has specified a SidecarScope object
// with ingress listeners. Specifying the ingress listener implies that the user wants
// to only have those specific listeners and nothing else, in the inbound path.
generateManagementListeners := true
sidecarScope := node.SidecarScope
if sidecarScope != nil && sidecarScope.HasCustomIngressListeners ||
noneMode {
generateManagementListeners = false
}
if generateManagementListeners {
// ...
}
tcpProxy := &tcp_proxy.TcpProxy{
StatPrefix: util.BlackHoleCluster,
ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.BlackHoleCluster},
}
if mesh.OutboundTrafficPolicy.Mode == meshconfig.MeshConfig_OutboundTrafficPolicy_ALLOW_ANY {
// We need a passthrough filter to fill in the filter stack for orig_dst listener
tcpProxy = &tcp_proxy.TcpProxy{
StatPrefix: util.PassthroughCluster,
ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.PassthroughCluster},
}
}
var transparent *google_protobuf.BoolValue
if node.GetInterceptionMode() == model.InterceptionTproxy {
transparent = proto.BoolTrue
}
// add an extra listener that binds to the port that is the recipient of the iptables redirect
listeners = append(listeners, &xdsapi.Listener{
Name: VirtualListenerName,
Address: util.BuildAddress(WildcardAddress, uint32(mesh.ProxyListenPort)),
Transparent: transparent,
UseOriginalDst: proto.BoolTrue,
FilterChains: []listener.FilterChain{
{
Filters: []listener.Filter{
{
Name: xdsutil.TCPProxy,
ConfigType: &listener.Filter_Config{
Config: util.MessageToStruct(tcpProxy),
},
},
},
},
},
})
}
httpProxyPort := mesh.ProxyHttpPort
if httpProxyPort == 0 && noneMode { // make sure http proxy is enabled for 'none' interception.
httpProxyPort = int32(pilot.DefaultPortHTTPProxy)
}
// enable HTTP PROXY port if necessary; this will add an RDS route for this port
if httpProxyPort > 0 {
useRemoteAddress := false
traceOperation := http_conn.EGRESS
listenAddress := LocalhostAddress
opts := buildListenerOpts{
env: env,
proxy: node,
proxyInstances: proxyInstances,
bind: listenAddress,
port: int(httpProxyPort),
filterChainOpts: []*filterChainOpts{{
httpOpts: &httpListenerOpts{
rds: RDSHttpProxy,
useRemoteAddress: useRemoteAddress,
direction: traceOperation,
connectionManager: &http_conn.HttpConnectionManager{
HttpProtocolOptions: &core.Http1ProtocolOptions{
AllowAbsoluteUrl: proto.BoolTrue,
},
},
},
}},
bindToPort: true,
skipUserFilters: true,
}
l := buildListener(opts)
// TODO: plugins for HTTP_PROXY mode, envoyfilter needs another listener match for SIDECAR_HTTP_PROXY
// there is no mixer for http_proxy
mutable := &plugin.MutableObjects{
Listener: l,
FilterChains: []plugin.FilterChain{{}},
}
pluginParams := &plugin.InputParams{
ListenerProtocol: plugin.ListenerProtocolHTTP,
ListenerCategory: networking.EnvoyFilter_ListenerMatch_SIDECAR_OUTBOUND,
Env: env,
Node: node,
ProxyInstances: proxyInstances,
Push: push,
}
if err := buildCompleteFilterChain(pluginParams, mutable, opts); err != nil {
log.Warna("buildSidecarListeners ", err.Error())
} else {
listeners = append(listeners, l)
}
// TODO: need inbound listeners in HTTP_PROXY case, with dedicated ingress listener.
}
return listeners, nil
}
EDS 初始化流程详解
eds_clusters 跟踪的架构大体如下:
cluster_name
-> proxy.LocalityA
-> eds_clusterA
-> (node1,node2….)
proxy.LocalityB
-> eds_clusterB
-> (node1,node2….)
EdsCluster
和 ClusterLoadAssignment
核心结构如下:
// EdsCluster tracks eds-related info for monitored clusters. In practice it'll include
// all clusters until we support on-demand cluster loading.
type EdsCluster struct {
// ...
LoadAssignment *xdsapi.ClusterLoadAssignment
// 记录当前 cluster 被那些 proxy 使用
// EdsClients keeps track of all nodes monitoring the cluster.
EdsClients map[string]*XdsConnection `json:"-"`
}
type ClusterLoadAssignment struct {
// Name of the cluster. This will be the :ref:`service_name
// <envoy_api_field_Cluster.EdsClusterConfig.service_name>` value if specified
// in the cluster :ref:`EdsClusterConfig
// <envoy_api_msg_Cluster.EdsClusterConfig>`.
ClusterName string
// List of endpoints to load balance to.
Endpoints []endpoint.LocalityLbEndpoints
// Map of named endpoints that can be referenced in LocalityLbEndpoints.
NamedEndpoints map[string]*endpoint.Endpoint
// Load balancing policy settings.
Policy *ClusterLoadAssignment_Policy
}
对于 EDS 推送的主体函数如下:
case EndpointType:
// ...
// 将当前已经存在的 cluster 从当前 eds_cluster 资源前跟踪的 Node 信息删除掉
for _, cn := range con.Clusters {
s.removeEdsCon(cn, con.ConID, con)
}
// 将当前请求的 cluster 信息和 node 信息添加到,cluster 跟踪的 Node 列表中
for _, cn := range clusters {
s.addEdsCon(cn, con.ConID, con)
// addEdsCon 函数会处理不存在的情况,并初始化 eds_cluster
// -> s.getOrAddEdsCluster(connection.modelNode, clusterName)
// c := edsClusters[clusterName]
// if c == nil {
// c := &EdsCluster{
// discovery: s,
// EdsClients: map[string]*XdsConnection{},
// FirstUse: time.Now(),
// }
// edsClusters[clusterName] = map[model.Locality]*EdsCluster{
// proxy.Locality: c,
// }
// return c
// }
}
con.Clusters = clusters
adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
err := s.pushEds(s.globalPushContext(), con, true, nil)
if err != nil {
return err
}
// 配置发生变化,如果有更新则出发增量更新
case pushEv := <-con.pushChannel:
// It is called when config changes.
// This is not optimized yet - we should detect what changed based on event and only
// push resources that need to be pushed.
// TODO: possible race condition: if a config change happens while the envoy
// was getting the initial config, between LDS and RDS, the push will miss the
// monitored 'routes'. Same for CDS/EDS interval.
// It is very tricky to handle due to the protocol - but the periodic push recovers
// from it.
err := s.pushConnection(con, pushEv)
if err != nil {
return nil
}
}
函数 pushEds
负责主体的推送:
// pushEds is pushing EDS updates for a single connection. Called the first time
// a client connects, for incremental updates and for full periodic updates.
func (s *DiscoveryServer) pushEds(push *model.PushContext, con *XdsConnection,
full bool, edsUpdatedServices map[string]*EndpointShards) error {
loadAssignments := []*xdsapi.ClusterLoadAssignment{}
emptyClusters := 0
endpoints := 0
// 根据 conn 连接上发送过来的 clusters name 循环拉取相关的信息
for _, clusterName := range con.Clusters {
_, _, hostname, _ := model.ParseSubsetKey(clusterName)
if edsUpdatedServices != nil && edsUpdatedServices[string(hostname)] == nil {
// Cluster was not updated, skip recomputing.
continue
}
// 根据 cluster_name + con.modelNode 获取到对应的 EdsCluster 对象
c := s.getEdsCluster(con.modelNode, clusterName)
l := c.LoadAssignment
// 如果存在,则重新更新 Cluster 中 LoadAssignment 相关的信息
if l == nil { // fresh cluster
edsClusters := map[model.Locality]*EdsCluster{
con.modelNode.Locality: c,
}
s.updateCluster(push, clusterName, edsClusters)
l = loadAssignment(c)
}
// If networks are set (by default they aren't) apply the Split Horizon
// EDS filter on the endpoints
// 根据 networks 进行的 eds 水平切分,则再进行一次过滤
if s.Env.MeshNetworks != nil && len(s.Env.MeshNetworks.Networks) > 0 {
endpoints := EndpointsByNetworkFilter(l.Endpoints, con, s.Env)
endpoints = LoadBalancingWeightNormalize(endpoints)
filteredCLA := &xdsapi.ClusterLoadAssignment{
ClusterName: l.ClusterName,
Endpoints: endpoints,
Policy: l.Policy,
}
l = filteredCLA
}
endpoints += len(l.Endpoints)
loadAssignments = append(loadAssignments, l)
}
response := endpointDiscoveryResponse(loadAssignments)
err := con.send(response)
return nil
}
RDS 初始化流程详解
结合 LDS 中加载的日志信息
[lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[lds_api.cc:80] lds: add/update listener 'virtual'
在 RDS 的请求中会包括 LDS 接口中的 IP 地址为 0.0.0.0 表明是外部访问的端口,将端口号作为请求的 route_names 向 ADS Server 发起请求,比如 ”8080,9093,8060,5000,15010“ 共 5 个发起请求,响应结果仅以 helloworld 5000 端口为例:
"dynamic_route_configs": [
{
"version_info": "2019-02-15T05:53:49Z/8",
"route_config": {
"name": "5000",
"virtual_hosts": [
{
"name": "helloworld.istio-system.svc.cluster.local:5000",
"domains": [
"helloworld.istio-system.svc.cluster.local",
"helloworld.istio-system.svc.cluster.local:5000",
"helloworld",
"helloworld:5000",
"helloworld.istio-system.svc.cluster",
"helloworld.istio-system.svc.cluster:5000",
"helloworld.istio-system.svc",
"helloworld.istio-system.svc:5000",
"helloworld.istio-system",
"helloworld.istio-system:5000",
"10.0.40.71",
"10.0.40.71:5000"
],
"routes": [
{
"match": {
"prefix": "/"
},
"route": {
"weighted_clusters": {
"clusters": [
{
"name": "outbound|5000|v1|helloworld.istio-system.svc.cluster.local",
"weight": 90,
"per_filter_config": {
}
},
{
"name": "outbound|5000|v2|helloworld.istio-system.svc.cluster.local",
"weight": 10,
"per_filter_config": {
}
}
]
},
"timeout": "0s",
"max_grpc_timeout": "0s"
},
"decorator": {
"operation": "helloworld:5000/*"
},
"per_filter_config": {
"mixer": {
"disable_check_calls": true
}
}
}
]
}
],
"validate_clusters": false
},
"last_updated": "2019-02-15T05:53:49.910Z"
},
]
处理入口代码:
case RouteType:
routes := discReq.GetResourceNames()
// ...
if sortedRoutes == nil {
sort.Strings(routes)
sortedRoutes = routes
}
con.Routes = sortedRoutes
adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes))
err := s.pushRoute(con, s.globalPushContext())
pushRoute
函数实现如下:
func (s *DiscoveryServer) pushRoute(con *XdsConnection, push *model.PushContext) error {
rawRoutes, err := s.generateRawRoutes(con, push)
response := routeDiscoveryResponse(rawRoutes)
err = con.send(response)
// ...
return nil
}
generateRawRoutes
主要调用函数 ConfigGenerator.BuildHTTPRoutes
完成数据的准备:
func (s *DiscoveryServer) generateRawRoutes(con *XdsConnection, push *model.PushContext) ([]*xdsapi.RouteConfiguration, error) {
rc := make([]*xdsapi.RouteConfiguration, 0)
for _, routeName := range con.Routes {
r, err := s.ConfigGenerator.BuildHTTPRoutes(s.Env, con.modelNode, push, routeName)
}
// 验证并追加
rc = append(rc, r)
}
return rc, nil
}
ConfigGeneratorImpl
函数根据 model.SidecarProxy
的类型调用对应的函数。
// BuildHTTPRoutes produces a list of routes for the proxy
func (configgen *ConfigGeneratorImpl) BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext,
routeName string) (*xdsapi.RouteConfiguration, error) {
proxyInstances, err := env.GetProxyServiceInstances(node)
switch node.Type {
case model.SidecarProxy:
return configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, proxyInstances, routeName), nil
case model.Router, model.Ingress:
return configgen.buildGatewayHTTPRouteConfig(env, node, push, proxyInstances, routeName)
}
return nil, nil
}
最终工作,会基于 routeName
和相对应的 VirtualService
共同生成最终的 Route
配置,格式上面已经给出:
// buildSidecarOutboundHTTPRouteConfig builds an outbound HTTP Route for sidecar.
// Based on port, will determine all virtual hosts that listen on the port.
func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext,
proxyInstances []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration {
listenerPort := 0
listenerPort, err = strconv.Atoi(routeName)
var virtualServices []model.Config
var services []*model.Service
// Get the list of services that correspond to this egressListener from the sidecarScope
sidecarScope := node.SidecarScope
// sidecarScope should never be nil
if sidecarScope != nil && sidecarScope.Config != nil {
// egress 相关的
} else { // 从默认的 meshGateway 中获取
meshGateway := map[string]bool{model.IstioMeshGateway: true}
services = push.Services(node)
virtualServices = push.VirtualServices(node, meshGateway)
}
nameToServiceMap := make(map[model.Hostname]*model.Service)
for _, svc := range services {
if listenerPort == 0 {
nameToServiceMap[svc.Hostname] = svc
} else {
if svcPort, exists := svc.Ports.GetByPort(listenerPort); exists {
nameToServiceMap[svc.Hostname] = &model.Service{
Hostname: svc.Hostname,
Address: svc.Address,
MeshExternal: svc.MeshExternal,
Ports: []*model.Port{svcPort},
}
}
}
}
// Collect all proxy labels for source match
var proxyLabels model.LabelsCollection
for _, w := range proxyInstances {
proxyLabels = append(proxyLabels, w.Labels)
}
// Get list of virtual services bound to the mesh gateway
virtualHostWrappers := istio_route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, nameToServiceMap, proxyLabels, virtualServices, listenerPort)
vHostPortMap := make(map[int][]route.VirtualHost)
for _, virtualHostWrapper := range virtualHostWrappers {
virtualHosts := make([]route.VirtualHost, 0, len(virtualHostWrapper.VirtualServiceHosts)+len(virtualHostWrapper.Services))
for _, host := range virtualHostWrapper.VirtualServiceHosts {
virtualHosts = append(virtualHosts, route.VirtualHost{
Name: fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port),
Domains: []string{host, fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port)},
Routes: virtualHostWrapper.Routes,
})
}
for _, svc := range virtualHostWrapper.Services {
virtualHosts = append(virtualHosts, route.VirtualHost{
Name: fmt.Sprintf("%s:%d", svc.Hostname, virtualHostWrapper.Port),
Domains: generateVirtualHostDomains(svc, virtualHostWrapper.Port, node),
Routes: virtualHostWrapper.Routes,
})
}
vHostPortMap[virtualHostWrapper.Port] = append(vHostPortMap[virtualHostWrapper.Port], virtualHosts...)
}
var virtualHosts []route.VirtualHost
if listenerPort == 0 {
virtualHosts = mergeAllVirtualHosts(vHostPortMap)
} else {
virtualHosts = vHostPortMap[listenerPort]
}
util.SortVirtualHosts(virtualHosts)
out := &xdsapi.RouteConfiguration{
Name: routeName,
VirtualHosts: virtualHosts,
ValidateClusters: proto.BoolFalse,
}
// call plugins
for _, p := range configgen.Plugins {
in := &plugin.InputParams{
ListenerProtocol: plugin.ListenerProtocolHTTP,
Env: env,
Node: node,
Push: push,
}
p.OnOutboundRouteConfiguration(in, out)
}
return out
}
Envoy 重启的 Pilot 连接日志
CDS -> EDS -> LDS -> RDS
2019-02-16T06:27:14.131219Z info ads ADS:CDS: REQ 10.128.69.0:48816 sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local-29 66.451µs raw: node:<id:"sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local" cluster:"helloworld" metadata:<fields:<key:"INTERCEPTION_MODE" value:<string_value:"REDIRECT" > > fields:<key:"ISTIO_PROXY_SHA" value:<string_value:"istio-proxy:930841ca88b15365737acb7eddeea6733d4f98b9" > > fields:<key:"ISTIO_PROXY_VERSION" value:<string_value:"1.0.2" > > fields:<key:"ISTIO_VERSION" value:<string_value:"1.0.5" > > fields:<key:"POD_NAME" value:<string_value:"helloworld-v1-8f8dd85-f99wk" > > fields:<key:"app" value:<string_value:"helloworld" > > fields:<key:"istio" value:<string_value:"sidecar" > > fields:<key:"version" value:<string_value:"v1" > > > build_version:"0/1.8.0-dev//RELEASE" > type_url:"type.googleapis.com/envoy.api.v2.Cluster"
2019-02-16T06:27:14.131444Z info ads CDS: PUSH 2019-02-15T05:53:49Z/8 for helloworld-v1-8f8dd85-f99wk.istio-system "10.128.69.0:48816", Clusters: 11, Services 3
2019-02-16T06:27:14.141574Z info ads EDS: PUSH for sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local-29 clusters 9 endpoints 9 empty 0
2019-02-16T06:27:14.142920Z info Uses TLS multiplexing for helloworld.istio-system.svc.cluster.local {http 5000 HTTP}
2019-02-16T06:27:14.149656Z info ads LDS: PUSH for node:helloworld-v1-8f8dd85-f99wk.istio-system addr:"10.128.69.0:48816" listeners:8 10981
2019-02-16T06:27:14.163397Z info ads ADS: RDS: PUSH for node: helloworld-v1-8f8dd85-f99wk.istio-system addr:10.128.69.0:48816 routes:5 ver:2019-02-15T05:53:49Z/8
envoy
2019-02-16T06:27:13.308594Z info Version root@6f6ea1061f2b-docker.io/istio-1.0.5-c1707e45e71c75d74bf3a5dec8c7086f32f32fad-Clean
2019-02-16T06:27:13.308678Z info Proxy role: model.Proxy{ClusterID:"", Type:"sidecar", IPAddress:"10.128.69.4", ID:"helloworld-v1-8f8dd85-f99wk.istio-system", Domain:"istio-system.svc.cluster.local", Metadata:map[string]string(nil)}
2019-02-16T06:27:13.309241Z info Effective config: binaryPath: /usr/local/bin/envoy
configPath: /etc/istio/proxy
connectTimeout: 10s
discoveryAddress: istio-pilot.istio-system:15007
discoveryRefreshDelay: 1s
drainDuration: 45s
parentShutdownDuration: 60s
proxyAdminPort: 15000
serviceCluster: helloworld
zipkinAddress: zipkin.istio-system:9411
2019-02-16T06:27:13.309274Z info Monitored certs: []envoy.CertSource{envoy.CertSource{Directory:"/etc/certs/", Files:[]string{"cert-chain.pem", "key.pem", "root-cert.pem"}}}
2019-02-16T06:27:13.309457Z info Starting proxy agent
2019-02-16T06:27:13.309573Z info Received new config, resetting budget
2019-02-16T06:27:13.310745Z info Reconciling configuration (budget 10)
2019-02-16T06:27:13.310805Z info Epoch 0 starting
2019-02-16T06:27:13.311883Z info Envoy command: [-c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60 --service-cluster helloworld --service-node sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local --max-obj-name-len 189 --allow-unknown-fields -l warn --v2-config-only]
// ...
[2019-02-16 06:27:13.347][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:130] cm init: initializing cds
[2019-02-16 06:27:14.133][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|15010||istio-pilot.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.134][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|15011||istio-pilot.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.134][24][info][upstream]
external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|8080||istio-pilot.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.135][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|9093||istio-pilot.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.136][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|8060||istio-citadel.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.137][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|9093||istio-citadel.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.138][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000||helloworld.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.138][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000|v1|helloworld.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.139][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000|v2|helloworld.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.140][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster inbound|5000||helloworld.istio-system.svc.cluster.local during init
[2019-02-16 06:27:14.141][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster BlackHoleCluster during init
[2019-02-16 06:27:14.141][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:111] cm init: initializing secondary clusters
[2019-02-16 06:27:14.142][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:134] cm init: all clusters initialized
[2019-02-16 06:27:14.142][24][info][main] external/envoy/source/server/server.cc:401] all clusters initialized. initializing init manager
[2019-02-16 06:27:14.155][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[2019-02-16 06:27:14.156][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[2019-02-16 06:27:14.157][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[2019-02-16 06:27:14.158][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[2019-02-16 06:27:14.159][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[2019-02-16 06:27:14.160][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[2019-02-16 06:27:14.160][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[2019-02-16 06:27:14.161][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener 'virtual'
[2019-02-16 06:27:14.165][24][info][config] external/envoy/source/server/listener_manager_impl.cc:908] all dependencies initialized. starting workers
configScope
Istio 1.1 代码中实现,参见 PR 10278,早期的版本实现是通过注解的方式来进行的,打在 service 上,注解如下:
// ServiceConfigScopeAnnotation configs the scope the service visible to. // "PUBLIC" which is the default, indicates it is reachable within the mesh // "PRIVATE" indicates it is reachable within its namespace ServiceConfigScopeAnnotation = "networking.istio.io/configScope"
ServiceEntry,VirtualService,Gateway, DestinationRule等都可以通过spec.configScope设置作用范围。ConfigScope 可以设置为”PUBLIC”,”PRIVATE”类型:
- “PUBLIC” 表示规则对网格内所有的工作负载可见,这也是默认值。
-
“PRIVATE” 表示规则仅对同一namespace下面的工作负载可见。
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: virtual-service-scope-private
spec:
configScope: PRIVATE
hosts:
- "bookinfo.com"
http:
- route:
- destination:
host: "bookinfo.com"
headers:
request:
add:
scope: private
虽然是旧的版本但是也有很多重要的信息,写的真好 牛逼。