Prometheus Discovery 之 K8S 代码分析

Service Discovery interface

Service Discovery 必须实现 Discovery 接口,定义如下:

type Discoverer interface {
   Run(ctx context.Context, ch chan<- []*targetgroup.Group)
}

Prometheus 支持了众多的 SD 发现机制,代码位于 discovery 目录下。

Group 的结构定义如下:

// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
    // Targets is a list of targets identified by a label set. Each target is
    // uniquely identifiable in the group by its address label.
    Targets []model.LabelSet
    // Labels is a set of labels that is common across all targets in the group.
    Labels model.LabelSet

    // Source is an identifier that describes a group of targets.
    Source string
}

K8S 的 Discoverer 定义在文件 gprometheus/discovery/kubernetes/kubernetes.go 中。

在 init 函数中注册了metrics prometheus_sd_kubernetes_events_total,用于分析发现过程中的事件接受数量:

func init() {
    prometheus.MustRegister(eventCount)

    // Initialize metric vectors.
    for _, role := range []string{"endpoints", "node", "pod", "service"} {
        for _, evt := range []string{"add", "delete", "update"} {
            eventCount.WithLabelValues(role, evt)
        }
    }
}

第一次可以全量的将全部事件发送到接口中定义的 ch 中,后续的更新事件,只需要发送更新的事件信息内容即可,如果信息被删除了则可以发送一个为空的事件内容(包含 Source),所有事件通过 Source 字段作为唯一 key。 

// Discovery implements the discoverer interface for discovering
// targets from Kubernetes.
// 每个 role 会启动一个单独的 Discovery 进行跟踪
type Discovery struct {
    sync.RWMutex
    client             kubernetes.Interface  // 连接到 k8s 的 client
    role               Role
    logger             log.Logger
    namespaceDiscovery *NamespaceDiscovery // 保存需要监控的 namespace 
    discoverers        []discoverer        // 每个 role 按照 namespace 进行划分,单独跟踪的 sd
}

其中 discoverers 分不同的 namespace,每个 namespace 会单独起一个内部的 discoverer 来进行单独的跟踪。

// This is only for internal use.
type discoverer interface {
    Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

初始化和运行

main 函数入口

func main() {
    // ...
    // 初始化 
    discoveryManagerScrape  = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"))

    // Notify 的作用待定?
    discoveryManagerNotify  = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"))

    // ...

    reloaders := []func(cfg *config.Config) error{
            // JobName 为 key ,对应到相关配置
            // v.JobName] = v.ServiceDiscoveryConfig
                func(cfg *config.Config) error {
            c := make(map[string]sd_config.ServiceDiscoveryConfig)
            for _, v := range cfg.ScrapeConfigs {
                c[v.JobName] = v.ServiceDiscoveryConfig
            }
            return discoveryManagerScrape.ApplyConfig(c)
        },
        // ...

     {
        // Scrape discovery manager.
        g.Add(
            func() error {
               // 调用 Run 启动
                err := discoveryManagerScrape.Run()
                level.Info(logger).Log("msg", "Scrape discovery manager stopped")
                return err
            },
            func(err error) {
                level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
                cancelScrape()
            },
        )
    }

   if err := g.Run(); err != nil {
        level.Error(logger).Log("err", err)
        os.Exit(1)
    }
}

整个 DS 的入口在 prometheus/discovery/manager.go

NewManager 函数用于生成 DS Mgr 对象:

// NewManager is the Discovery Manager constructor
func NewManager(ctx context.Context, logger log.Logger) *Manager {
   if logger == nil {
      logger = log.NewNopLogger()
   }
   return &Manager{
      logger:         logger,
      syncCh:         make(chan map[string][]*targetgroup.Group),
      targets:        make(map[poolKey]map[string]*targetgroup.Group),
      discoverCancel: []context.CancelFunc{},
      ctx:            ctx,
   }
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
    m.mtx.Lock()
    defer m.mtx.Unlock()

    m.cancelDiscoverers()
    // 对于配置文件组织管理
    for name, scfg := range cfg {
        m.registerProviders(scfg, name)
    }

    // 全部启动
    for _, prov := range m.providers {
        m.startProvider(m.ctx, prov)
    }

    return nil
}

将每个 JobName 为 key 的结构进行注册管理

// provider 用于管理可能相同配置的不同 job 任务
// provider holds a Discoverer instance, its configuration and its subscribers.
type provider struct {
    name   string       // "kubernetes_sd_configs/[0-n]"
    d      Discoverer   // 对应的 Discoverer 
    subs   []string     // 配置相关情况下的,可能是不同的 JobName
    config interface{}  // 对应的相关配置
}

Manager 结构如下:

// Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name.
type Manager struct {
   logger         log.Logger
   mtx            sync.RWMutex
   ctx            context.Context
   discoverCancel []context.CancelFunc

   // Some Discoverers(eg. k8s) send only the updates for a given target group
   // so we use map[tg.Source]*targetgroup.Group to know which group to update.
    // poolkey: job_name + provider_n, 将事件放到各个 job 任务的队列中
   targets map[poolKey]map[string]*targetgroup.Group 
   // providers keeps track of SD providers.
   providers []*provider
   // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
    // 经过聚合后,将需要更新的对象信息发送到 jobName 的队列中
   syncCh chan map[string][]*targetgroup.Group
}

registerProviders:

func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string){
    // setName 为 JobName
    add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
        t := reflect.TypeOf(cfg).String() // kubernetes_sd_configs
        for _, p := range m.providers {
            if reflect.DeepEqual(cfg, p.config) {
                p.subs = append(p.subs, setName)
                return
            }
        }

        // call kubernetes.New(log.With(m.logger, "discovery", "k8s"), cfg)
        d, err := newDiscoverer()
        provider := provider{
            // t = "kubernetes_sd_configs"
            name:   fmt.Sprintf("%s/%d", t, len(m.providers)),
            d:      d,
            config: cfg,
            subs:   []string{setName},
        }
        m.providers = append(m.providers, &provider)
    }

    // ...
    // 循环处理 k8s 相关的配置
    for _, c := range cfg.KubernetesSDConfigs {
        add(c, func() (Discoverer, error) {
            return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
        })
    }
    // ...

整体结构如下:

Manager --> []provider -> provider[k8s/0] -->  Discovery ->  [roleA]discoverys -> ns1, ns2 
                          provider[k8s/n]                ->  [roleB]discoverys -> ns1, ns2
                          provider[xxx/n]         discovery: Service, Endpoints, Service..

启动:

func (m *Manager) startProvider(ctx context.Context, p *provider) {
   level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
   ctx, cancel := context.WithCancel(ctx)

   // updates 为 SD 对外输出目标的通道,需要重点关注
   updates := make(chan []*targetgroup.Group)

   m.discoverCancel = append(m.discoverCancel, cancel)

   go p.d.Run(ctx, updates) // 循环启动
   go m.updater(ctx, p, updates)
}

 updater(ctx, p, updates) 负责从 SD 发送的通道中读取数据:

func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
   ticker := time.NewTicker(5 * time.Second)
   defer ticker.Stop()

   triggerUpdate := make(chan struct{}, 1)

   for {
      select {
      case <-ctx.Done():
         return
      case tgs, ok := <-updates:
         if !ok {
            level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name)
            select {
            case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update.
               level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name)
               return
            case <-ctx.Done():
               return
            }

         }

          // s: job_name, provider: k8s/n
          // 针对可能的订阅者(包括相同配置的订阅者) 发送事件
         for _, s := range p.subs {
            m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
         }

         select {
         case triggerUpdate <- struct{}{}:
         default:
         }
      case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
         select {
         case <-triggerUpdate:
            select {
                // m.allGroups() 按照 pkey.SetName (job_name 进行聚合)
            case m.syncCh <- m.allGroups(): 
            default:
               level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name)
               select {
               case triggerUpdate <- struct{}{}:
               default:
               }
            }
         default:
         }
      }
   }
}

 读取 channel 数据后,放入到相对应的 poolkey 中进行更新

func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
    m.mtx.Lock()
    defer m.mtx.Unlock()

    for _, tg := range tgs {
        if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
            if _, ok := m.targets[poolKey]; !ok {
                m.targets[poolKey] = make(map[string]*targetgroup.Group)
            }
            // poolkey: job_name + provider_n, 
            m.targets[poolKey][tg.Source] = tg
        }
    }
}

最终 Manager 将数据汇总到了 syncCh chan map[string][]*targetgroup.Group 中的定义的 JobName 对应的队列中,通过其 SyncCh 函数将该通道返回出去

处理更新后的事件

main 函数中,scrapeManager 负责从 DS Manager 的输出队列中读取数据:

int main(){

    // ...
    {
        // Scrape manager.
        g.Add(
            func() error {
                // When the scrape manager receives a new targets list
                // it needs to read a valid config for each job.
                // It depends on the config being in sync with the discovery manager so
                // we wait until the config is fully loaded.
                <-reloadReady.C

                err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
                level.Info(logger).Log("msg", "Scrape manager stopped")
                return err
            },
            func(err error) {
                // Scrape manager needs to be stopped before closing the local TSDB
                // so that it doesn't try to write samples to a closed storage.
                level.Info(logger).Log("msg", "Stopping scrape manager...")
                scrapeManager.Stop()
            },
        )
    }

    // ...
}

scraple/manager.go 中定义:

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
    logger    log.Logger
    append    Appendable
    graceShut chan struct{}

    mtxTargets     sync.Mutex // Guards the fields below.
    targetsActive  []*Target
    targetsDropped []*Target
    targetsAll     map[string][]*Target

    mtxScrape     sync.Mutex // Guards the fields below.
    scrapeConfigs map[string]*config.ScrapeConfig
    scrapePools   map[string]*scrapePool
}

// Run starts background processing to handle target updates and reload the scraping loops.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
    for {
        select {
        case ts := <-tsets:
            m.reload(ts)
        case <-m.graceShut:
            return nil
        }
    }
}

reload 函数定义如下:

func (m *Manager) reload(t map[string][]*targetgroup.Group) {
    m.mtxScrape.Lock()
    defer m.mtxScrape.Unlock()

    tDropped := make(map[string][]*Target)
    tActive := make(map[string][]*Target)

    for tsetName, tgroup := range t {
        var sp *scrapePool
        if existing, ok := m.scrapePools[tsetName]; !ok {
            scrapeConfig, ok := m.scrapeConfigs[tsetName]
            if !ok {
                level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
                continue
            }
            sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
            m.scrapePools[tsetName] = sp
        } else {
            sp = existing
        }

        // Sync 函数中用于过滤相关符合条件的 Target
        tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
    }

    // 更新获取和丢弃的目标,可以通过界面查询到对应的结果 
    m.targetsUpdate(tActive, tDropped)
}

sync 定义如下:

// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
// 同步将目标组转换为实际的抓取目标,并将当前运行的抓取与结果集同步,并返回所有刮取和删除的目标。
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) {
    start := time.Now()

    var all []*Target
    sp.mtx.Lock()
    sp.droppedTargets = []*Target{}
    for _, tg := range tgs {
        // targetsFromGroup 函数通过相关配置完成转换
        targets, err := targetsFromGroup(tg, sp.config)

        for _, t := range targets {
            // 返回目标的标签,不是处理后的,不以 “__” 为前缀
            if t.Labels().Len() > 0 { // 不存在以 “__" 为前缀匹配的标签
                all = append(all, t)
            } else if t.DiscoveredLabels().Len() > 0 {
                sp.droppedTargets = append(sp.droppedTargets, t)
            }
        }
    }
    sp.mtx.Unlock()
    sp.sync(all)

    targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
        time.Since(start).Seconds(),
    )
    targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()

    sp.mtx.RLock()
    for _, t := range sp.targets {
        tActive = append(tActive, t)
    }
    tDropped = sp.droppedTargets
    sp.mtx.RUnlock()

    return tActive, tDropped
}

targetsFromGroup函数根据输入的对象信息和相关配置完成过滤的整体工作:

// targetsFromGroup builds targets based on the given TargetGroup and config.
// targetsFromGroup 根据给定的 TargetGroup 和 config 构建目标
func targetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig) ([]*Target, error) {
    targets := make([]*Target, 0, len(tg.Targets))

    for i, tlset := range tg.Targets {
        // 将每个数组中的标签和通用标签合并进行过滤
        lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))

        // 合并发现的标签
        for ln, lv := range tlset {
            lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
        }

        // 合并通用标签
        for ln, lv := range tg.Labels {
            if _, ok := tlset[ln]; !ok {
                lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
            }
        }

        // 复制一份合并后的标签列表
        lset := labels.New(lbls...)

        // 根据 cfg 配置来进行过滤, lset 为本次的全量标签; lbls 为根据配置处理后的标签集合,origLabels 为处理之前的原始标签集
        lbls, origLabels, err := populateLabels(lset, cfg)

        // 如果 lbls 或者 origLabels 有一个不为空,则加入
        if lbls != nil || origLabels != nil {    // cfg.Params 配置中添加到 url 后的参数
            targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
        }
    }
    return targets, nil
}

过滤后的 Target 结构定义如下:

// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
    // Labels before any processing.
    discoveredLabels labels.Labels

    // Any labels that are added to this target and its metrics.
    labels labels.Labels

    // Additional URL parmeters that are part of the target URL.
    params url.Values

    mtx        sync.RWMutex
    lastError  error
    lastScrape time.Time
    health     TargetHealth
    metadata   metricMetadataStore
}

populateLabels 根据给定的标签集和 scrape 配置构建标签集。
会在重新标记应用之前返回标签集作为第二个返回值。
如果在重新标记期间丢弃目标,则返回在应用重新标记之前找到的原始发现标签集。

// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
// 函数根据给定的 labels 和 相关配置的选项,来进行 relabel 的处理,返回的第一个参数为匹配后的结果集,第二个参数返回应用之前的 labels, 如果第一个参数为空,则表示该目标被丢失,比如 action:drop 
func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
   // Copy labels into the labelset for the target if they are not set already.
   scrapeLabels := []labels.Label{
      {Name: model.JobLabel, Value: cfg.JobName},
      {Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
      {Name: model.SchemeLabel, Value: cfg.Scheme},
   }
   lb := labels.NewBuilder(lset)

   for _, l := range scrapeLabels {
      if lv := lset.Get(l.Name); lv == "" {
         lb.Set(l.Name, l.Value)
      }
   }
   // Encode scrape query parameters as labels.
   for k, v := range cfg.Params {
      if len(v) > 0 {
         lb.Set(model.ParamLabelPrefix+k, v[0])
      }
   }

   preRelabelLabels := lb.Labels()
   // relabel.Process 进程返回给定标签集的重新标记的副本。 relabel 按输入顺序应用。
   // 如果删除标签集,则返回nill
   // 可以返回修改的输入 labelSet。
   // Process 会自动添加 job 和 instance 两个lable, 如果 lset 为空这说明不是监控的目标
   lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)

   // Check if the target was dropped.
   if lset == nil {
      return nil, preRelabelLabels, nil
   }
   if v := lset.Get(model.AddressLabel); v == "" {
      return nil, nil, fmt.Errorf("no address")
   }

   lb = labels.NewBuilder(lset)

   // addPort checks whether we should add a default port to the address.
   // If the address is not valid, we don't append a port either.
   addPort := func(s string) bool {
      // If we can split, a port exists and we don't have to add one.
      if _, _, err := net.SplitHostPort(s); err == nil {
         return false
      }
      // If adding a port makes it valid, the previous error
      // was not due to an invalid address and we can append a port.
      _, _, err := net.SplitHostPort(s + ":1234")
      return err == nil
   }
   addr := lset.Get(model.AddressLabel)
   // If it's an address with no trailing port, infer it based on the used scheme.
   if addPort(addr) {
      // Addresses reaching this point are already wrapped in [] if necessary.
      switch lset.Get(model.SchemeLabel) {
      case "http", "":
         addr = addr + ":80"
      case "https":
         addr = addr + ":443"
      default:
         return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
      }
      lb.Set(model.AddressLabel, addr)
   }

   if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
      return nil, nil, err
   }

   // Meta labels are deleted after relabelling. Other internal labels propagate to
   // the target which decides whether they will be part of their label set.
   for _, l := range lset {
      if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
         lb.Del(l.Name)
      }
   }

   // Default the instance label to the target address.
   if v := lset.Get(model.InstanceLabel); v == "" {
      lb.Set(model.InstanceLabel, addr)
   }

   res = lb.Labels()
   for _, l := range res {
      // Check label values are valid, drop the target if not.
      if !model.LabelValue(l.Value).IsValid() {
         return nil, nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
      }
   }
   return res, preRelabelLabels, nil
}

关于 prometheus_client 相关的测试样例参见: https://github.com/DavadDi/Kubernetes_study/tree/master/prometheus_client

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注