gRPC 之 LoadBalancer

基于 Etcd 实现的服务注册和发现的 grpclb 参见:(grpclb)[https://github.com/DavadDi/grpclb]

前提

gRPC 负载均衡是针对每次请求,而不是连接,这样可以保证服务端负载的均衡性。负载均衡器按照实现侧不同一般分为两种:

1. Proxy

2. Client Side

Thick Client

所有负载均衡算法实现都在客户端。

Lookaside Load Balancing

也称单臂路由。

Etcd Loadbanlacer 实现

gPRC 当前最新版本为 Release 1.14.0,由于 Etcd 不同版本的 Banlancer 实现方式不仅相同,可以参考:client-architecture/ Etcd v3.4.0 (TBD 2018-09)。组合条件有以下几类算法:

  • clientv3-grpc1.0
    • 客户端同时保留与服务端 Nodes 多个连接,在出现错误时候,可以快速重试其他的连接

    • limit: 保持多个连接浪费资源;对于 Nodes 的健康状态或者集群 Membership 关系未知,在某个 Node 出现问题时可能不能正常工作。

  • clientv3-grpc1.7 (可选者方案)

    • 与服务端中的某一个 Node 保持连接。当集群中有多个endpoints 时候,尝试连接到所有的 endpoints,一旦选中了一个连接(pinned address),关闭其他的连接,直至该连接被关闭。如果调用中间出现错误,则进入错误处理流程。

    client-architecture-balancer-figure-02
    client-architecture-balancer-figure-03
    client-architecture-balancer-figure-04
    client-architecture-balancer-figure-05
    client-architecture-balancer-figure-06

    • 限制:使用 HTTP/2 keepalives 来保持心跳,可能会出现脑裂的情况。


  • clientv3-grpc1.14 (v3.4.0 开发中)

gRPC 1.14 LD 分析

dnsResolver

注册 dnsResolver:

func init() {
   resolver.Register(NewBuilder())
}

Resolver 的 Builder 接口。Builder 会创建一个用于监视名称解析更新的 resolver 。

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
   // Build creates a new resolver for the given target.
   //
   // gRPC dial calls Build synchronously, and fails if the returned error is
   // not nil.
   Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
   // Scheme returns the scheme supported by this resolver.
   // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
   Scheme() string
}

Resover 的接口定义如下:

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
    // ResolveNow will be called by gRPC to try to resolve the target name
    // again. It's just a hint, resolver can ignore this if it's not necessary.
    //
    // It could be called multiple times concurrently.
    ResolveNow(ResolveNowOption)
    // Close closes the resolver.
    Close()
}

代码分析,首先 NewBuilder 作为一个工厂来创建一个 resolver.Builder,供 gRPC 程序来进行调用:

// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
   return &dnsBuilder{minFreq: defaultFreq}
}

dnsResolver 结构定义如下:

// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
    freq       time.Duration
    backoff    backoff.Exponential
    retryCount int
    host       string
    port       string
    ctx        context.Context
    cancel     context.CancelFunc
    cc         resolver.ClientConn
    // rn 是一个 channel, 用于 ResolveNow() 调用的时候强制进行解析
    // rn channel is used by ResolveNow() to force an immediate resolution of the target.
    rn chan struct{}
    t  *time.Timer
    // wg 用于等待 watcher 的 goroutine 结束,否则可能导致 data race
    // wg is used to enforce Close() to return after the watcher() goroutine has finished.
    // Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
    // replace the real lookup functions with mocked ones to facilitate testing.
    // If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
    // will warns lookup (READ the lookup function pointers) inside watcher() goroutine
    // has data race with replaceNetFunc (WRITE the lookup function pointers).
    wg                   sync.WaitGroup
    disableServiceConfig bool
}

Build 创建并启动一个 DNS 解析器,用于监视目标的名称解析。

// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
    // ...
    host, port, err := parseTarget(target.Endpoint)

    // IP address.
    // 省略分析 ...

    // DNS address (non-IP).
    // 创建一个用于跟踪的 ctx 和 cancel 
    ctx, cancel := context.WithCancel(context.Background())
    d := &dnsResolver{
        freq:                 b.minFreq,
        backoff:              backoff.Exponential{MaxDelay: b.minFreq},
        host:                 host,
        port:                 port,
        ctx:                  ctx,
        cancel:               cancel,
        cc:                   cc,
        t:                    time.NewTimer(0),
        rn:                   make(chan struct{}, 1),
        disableServiceConfig: opts.DisableServiceConfig,
    }

    d.wg.Add(1)
    go d.watcher()  // 启动 watcher 进行监听
    return d, nil
}

传入参数 cc resolver.ClientConn,ClientConn 包含 resolver 的回调,以通知 gRPC ClientConn 的任何更新。

// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
    // NewAddress is called by resolver to notify ClientConn a new list
    // of resolved addresses.
    // The address list should be the complete list of resolved addresses.
    NewAddress(addresses []Address)
    // NewServiceConfig is called by resolver to notify ClientConn a new
    // service config. The service config should be provided as a json string.
    NewServiceConfig(serviceConfig string)
}

watcher 函数的实现如下:

func (d *dnsResolver) watcher() {
    defer d.wg.Done()
    // 设置一个 watcher 死循环,一直在监听
    for {
        select {
        case <-d.ctx.Done(): // 如果被取消,通过 ctx,则直接结束 for 循环
            return
        case <-d.t.C:        // 定时器触发,第一次为0,所以直接通过
        case <-d.rn:         // rn 是一个 channel, 用于 ResolveNow() 调用的时候强制进行解析
                             // ResolveNow 的时候操作  ase d.rn <- struct{}{}:
        }
        // 开始异步进行解析
        result, sc := d.lookup()
        // Next lookup should happen within an interval defined by d.freq. It may be
        // more often due to exponential retry on empty address list.
        if len(result) == 0 {
            d.retryCount++
            d.t.Reset(d.backoff.Backoff(d.retryCount))
        } else {
            d.retryCount = 0
            d.t.Reset(d.freq)
        }
        d.cc.NewServiceConfig(sc)   // 回调进行通知
        d.cc.NewAddress(result)     // 回调进行通知
    }
}

通过 Close 函数的调用来达到停止 watcher 的效果:

// Close closes the dnsResolver.
func (d *dnsResolver) Close() {
    d.cancel()
    d.wg.Wait()
    d.t.Stop()
}

RR Balancer

Banlancer Builder 接口定义如下:

// Builder creates a balancer.
type Builder interface {
    // Build creates a new balancer with the ClientConn.
    Build(cc ClientConn, opts BuildOptions) Balancer
    // Name returns the name of balancers built by this builder.
    // It will be used to pick balancers (for example in service config).
    Name() string
}

Banlancer 接口定义如下:

// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
    // HandleSubConnStateChange is called by gRPC when the connectivity state
    // of sc has changed.
    // Balancer is expected to aggregate all the state of SubConn and report
    // that back to gRPC.
    // Balancer should also generate and update Pickers when its internal state has
    // been changed by the new state.
    HandleSubConnStateChange(sc SubConn, state connectivity.State)
    // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
    // balancers.
    // Balancer can create new SubConn or remove SubConn with the addresses.
    // An empty address slice and a non-nil error will be passed if the resolver returns
    // non-nil error to gRPC.
    HandleResolvedAddrs([]resolver.Address, error)
    // Close closes the balancer. The balancer is not required to call
    // ClientConn.RemoveSubConn for its existing SubConns.
    Close()
}

banlancer/base 中实现了大多数 banlancer 功能,不同选择算法的实现,基于 base 实现 pickBuilder 与 picker 接口即可。

base.NewBalancerBuilder 函数定义如下:

// Base 实现的函数
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
   return &baseBuilder{
      name:          name,
      pickerBuilder: pb,
   }
}

RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder 实现了 balancer.Builder 接口,可以用于注册。

// Name is the name of round_robin balancer.
const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
    return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}

func init() {
    balancer.Register(newBuilder())
}

PickerBuilder 接口定义如下:

// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
    // Build takes a slice of ready SubConns, and returns a picker that will be
    // used by gRPC to pick a SubConn.
    Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}

RoundRobin Banlancer Builder 实现了 PickerBuilder 的接口:

func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
    grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
    // 将 map 接口转化成 slice[] 结构,并使用期构造 rrPicker 并返回
    var scs []balancer.SubConn
    for _, sc := range readySCs {
        scs = append(scs, sc)
    }
    return &rrPicker{
        subConns: scs,
    }
}

banlancer.Picker 由 rrPicker 来实现:

type rrPicker struct {
    // subConns is the snapshot of the roundrobin balancer when this picker was
    // created. The slice is immutable. Each Get() will do a round robin
    // selection from it and return the selected SubConn.
    subConns []balancer.SubConn

    mu   sync.Mutex
    // 用于记录下一个位移量
    next int
}

func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
    if len(p.subConns) <= 0 {
        return nil, nil, balancer.ErrNoSubConnAvailable
    }

    p.mu.Lock()
    sc := p.subConns[p.next]
    p.next = (p.next + 1) % len(p.subConns)
    p.mu.Unlock()
    return sc, nil, nil
}

Etcd 服务注册与发现

clientv3-grpc1.14: Official client implementation, with grpc-go v1.14.x, which is used in latest etcd v3.4.

etcdv3Client -> autoSync() -> Sync() -> c.SetEndpoints(eps…) -> gc.resolverGroup.SetEndpoints(eps) -> EveryResover -> ClientConn update

参考

  1. gRPC Load Balancing
  2. Load Balancing in gRPC
  3. gRPC服务发现&负载均衡
  4. go语言gRPC负载均衡库grpc-lb的使用
  5. bsm/grpclb External Load Balancing Service solution for gRPC written in Go
  6. Writing gRPC Interceptors in Go
  7. [proposal](https://github.com/grpc/proposal)/L9-go-resolver-balancer-API.md
  8. https://github.com/DavadDi/wonaming
  9. etcd学习笔记(etcdv3, gRPC服务发现和负载均衡)
  10. etcd v3 服务注册与发现 Go代码
  11. https://godoc.org/github.com/coreos/etcd/clientv3/namespace 自动使用前缀
  12. 使用gvm管理多版本golang gvm 管理多个 golang 环境,类似于 python virtural env 方式

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注