基于 Etcd 实现的服务注册和发现的 grpclb 参见:(grpclb)[https://github.com/DavadDi/grpclb]
前提
gRPC 负载均衡是针对每次请求,而不是连接,这样可以保证服务端负载的均衡性。负载均衡器按照实现侧不同一般分为两种:
1. Proxy
2. Client Side
Thick Client
所有负载均衡算法实现都在客户端。
Lookaside Load Balancing
也称单臂路由。
Etcd Loadbanlacer 实现
gPRC 当前最新版本为 Release 1.14.0,由于 Etcd 不同版本的 Banlancer 实现方式不仅相同,可以参考:client-architecture/ Etcd v3.4.0 (TBD 2018-09)。组合条件有以下几类算法:
- clientv3-grpc1.0
- 客户端同时保留与服务端 Nodes 多个连接,在出现错误时候,可以快速重试其他的连接
-
limit: 保持多个连接浪费资源;对于 Nodes 的健康状态或者集群 Membership 关系未知,在某个 Node 出现问题时可能不能正常工作。
-
clientv3-grpc1.7 (可选者方案)
- 与服务端中的某一个 Node 保持连接。当集群中有多个endpoints 时候,尝试连接到所有的 endpoints,一旦选中了一个连接(pinned address),关闭其他的连接,直至该连接被关闭。如果调用中间出现错误,则进入错误处理流程。
- 限制:使用 HTTP/2 keepalives 来保持心跳,可能会出现脑裂的情况。
- clientv3-grpc1.14 (v3.4.0 开发中)
gRPC 1.14 LD 分析
dnsResolver
注册 dnsResolver:
func init() {
resolver.Register(NewBuilder())
}
Resolver 的 Builder 接口。Builder 会创建一个用于监视名称解析更新的 resolver 。
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
Resover 的接口定义如下:
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOption)
// Close closes the resolver.
Close()
}
代码分析,首先 NewBuilder
作为一个工厂来创建一个 resolver.Builder
,供 gRPC 程序来进行调用:
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{minFreq: defaultFreq}
}
dnsResolver 结构定义如下:
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
freq time.Duration
backoff backoff.Exponential
retryCount int
host string
port string
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn 是一个 channel, 用于 ResolveNow() 调用的时候强制进行解析
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
// wg 用于等待 watcher 的 goroutine 结束,否则可能导致 data race
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
// If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
// will warns lookup (READ the lookup function pointers) inside watcher() goroutine
// has data race with replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
disableServiceConfig bool
}
Build 创建并启动一个 DNS 解析器,用于监视目标的名称解析。
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
// ...
host, port, err := parseTarget(target.Endpoint)
// IP address.
// 省略分析 ...
// DNS address (non-IP).
// 创建一个用于跟踪的 ctx 和 cancel
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
freq: b.minFreq,
backoff: backoff.Exponential{MaxDelay: b.minFreq},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
d.wg.Add(1)
go d.watcher() // 启动 watcher 进行监听
return d, nil
}
传入参数 cc resolver.ClientConn
,ClientConn 包含 resolver
的回调,以通知 gRPC
ClientConn
的任何更新。
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
NewServiceConfig(serviceConfig string)
}
watcher
函数的实现如下:
func (d *dnsResolver) watcher() {
defer d.wg.Done()
// 设置一个 watcher 死循环,一直在监听
for {
select {
case <-d.ctx.Done(): // 如果被取消,通过 ctx,则直接结束 for 循环
return
case <-d.t.C: // 定时器触发,第一次为0,所以直接通过
case <-d.rn: // rn 是一个 channel, 用于 ResolveNow() 调用的时候强制进行解析
// ResolveNow 的时候操作 ase d.rn <- struct{}{}:
}
// 开始异步进行解析
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
d.cc.NewServiceConfig(sc) // 回调进行通知
d.cc.NewAddress(result) // 回调进行通知
}
}
通过 Close 函数的调用来达到停止 watcher 的效果:
// Close closes the dnsResolver.
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
d.t.Stop()
}
RR Balancer
Banlancer Builder 接口定义如下:
// Builder creates a balancer.
type Builder interface {
// Build creates a new balancer with the ClientConn.
Build(cc ClientConn, opts BuildOptions) Balancer
// Name returns the name of balancers built by this builder.
// It will be used to pick balancers (for example in service config).
Name() string
}
Banlancer 接口定义如下:
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
banlancer/base 中实现了大多数 banlancer 功能,不同选择算法的实现,基于 base 实现 pickBuilder 与 picker 接口即可。
base.NewBalancerBuilder
函数定义如下:
// Base 实现的函数
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
}
}
RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder
实现了 balancer.Builder 接口,可以用于注册。
// Name is the name of round_robin balancer.
const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}
func init() {
balancer.Register(newBuilder())
}
PickerBuilder 接口定义如下:
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}
RoundRobin Banlancer Builder 实现了 PickerBuilder
的接口:
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
// 将 map 接口转化成 slice[] 结构,并使用期构造 rrPicker 并返回
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
}
return &rrPicker{
subConns: scs,
}
}
banlancer.Picker 由 rrPicker 来实现:
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
mu sync.Mutex
// 用于记录下一个位移量
next int
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}
Etcd 服务注册与发现
clientv3-grpc1.14: Official client implementation, with grpc-go v1.14.x, which is used in latest etcd v3.4.
etcdv3Client -> autoSync() -> Sync() -> c.SetEndpoints(eps…) -> gc.resolverGroup.SetEndpoints(eps) -> EveryResover -> ClientConn update
参考
- gRPC Load Balancing
- Load Balancing in gRPC
- gRPC服务发现&负载均衡
- go语言gRPC负载均衡库grpc-lb的使用
- bsm/grpclb External Load Balancing Service solution for gRPC written in Go
- Writing gRPC Interceptors in Go
- [proposal](https://github.com/grpc/proposal)/L9-go-resolver-balancer-API.md
- https://github.com/DavadDi/wonaming
- etcd学习笔记(etcdv3, gRPC服务发现和负载均衡)
- etcd v3 服务注册与发现 Go代码
- https://godoc.org/github.com/coreos/etcd/clientv3/namespace 自动使用前缀
- 使用gvm管理多版本golang gvm 管理多个 golang 环境,类似于 python virtural env 方式