Skip to content

Commit

Permalink
health: Client LB channel health checking (grpc#2387)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan authored Nov 1, 2018
1 parent f4273b1 commit 105f614
Show file tree
Hide file tree
Showing 10 changed files with 1,394 additions and 14 deletions.
3 changes: 3 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type NewSubConnOptions struct {
// SubConn. If it's nil, the original creds from grpc DialOptions will be
// used.
CredsBundle credentials.Bundle
// HealthCheckEnabled indicates whether health check service should be
// enabled on this SubConn
HealthCheckEnabled bool
}

// ClientConn represents a gRPC ClientConn.
Expand Down
5 changes: 4 additions & 1 deletion balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
config Config
}

func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
Expand All @@ -43,6 +44,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
config: bb.config,
}
}

Expand All @@ -60,6 +62,7 @@ type baseBalancer struct {
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
}

func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
Expand All @@ -74,7 +77,7 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
Expand Down
12 changes: 12 additions & 0 deletions balancer/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,20 @@ type PickerBuilder interface {
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return NewBalancerBuilderWithConfig(name, pb, Config{})
}

// Config contains the config info about the base balancer builder.
type Config struct {
// HealthCheck indicates whether health checking should be enabled for this specific balancer.
HealthCheck bool
}

// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config.
func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
config: config,
}
}
2 changes: 1 addition & 1 deletion balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
return base.NewBalancerBuilderWithConfig(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}

func init() {
Expand Down
108 changes: 100 additions & 8 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -306,7 +307,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface{ Temporary() bool })
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
Expand Down Expand Up @@ -715,6 +718,12 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
return m
}

func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
cc.mu.RLock()
defer cc.mu.RUnlock()
return cc.sc.healthCheckConfig
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
hdr, _ := metadata.FromOutgoingContext(ctx)
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
Expand Down Expand Up @@ -877,6 +886,10 @@ type addrConn struct {
acbw balancer.SubConn
scopts balancer.NewSubConnOptions

// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
// health checking may require server to report healthy to set ac to READY), and is reset
// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
// is received, transport is closed, ac has been torn down).
transport transport.ClientTransport // The current transport.

mu sync.Mutex
Expand All @@ -903,6 +916,8 @@ type addrConn struct {
czData *channelzData

successfulHandshake bool

healthCheckEnabled bool
}

// Note: this requires a lock on ac.mu.
Expand Down Expand Up @@ -956,6 +971,8 @@ func (ac *addrConn) resetTransport(resolveNow bool) {
return
}

// The transport that was used before is no longer viable.
ac.transport = nil
// If the connection is READY, a failure must have occurred.
// Otherwise, we'll consider this is a transient failure when:
// We've exhausted all addresses
Expand Down Expand Up @@ -1044,7 +1061,10 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
var serverPrefaceReceived bool
var clientPrefaceWrote bool

hcCtx, hcCancel := context.WithCancel(ac.ctx)

onGoAway := func(r transport.GoAwayReason) {
hcCancel()
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
Expand All @@ -1059,6 +1079,7 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))

onClose := func() {
hcCancel()
close(onCloseCalled)
prefaceTimer.Stop()

Expand Down Expand Up @@ -1166,22 +1187,46 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
return err
}

// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
close(skipReset)
newTr.Close()
return nil
}
ac.transport = newTr
ac.mu.Unlock()

healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if internal.HealthCheckFunc != nil {
go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
close(allowedToReset)
return nil
}
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
}

// No LB channel health check case
ac.mu.Lock()

if ac.state == connectivity.Shutdown {
ac.mu.Unlock()

// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
// unblock onGoAway/onClose callback.
close(skipReset)

newTr.Close()
return errConnClosing
}

ac.updateConnectivityState(connectivity.Ready)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.transport = newTr
ac.curAddr = addr

ac.mu.Unlock()
Expand All @@ -1192,6 +1237,51 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
return nil
}

func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
// Set up the health check helper functions
newStream := func() (interface{}, error) {
return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
}
firstReady := true
reportHealth := func(ok bool) {
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.transport != newTr {
return
}
if ok {
if firstReady {
firstReady = false
ac.curAddr = addr
}
if ac.state != connectivity.Ready {
ac.updateConnectivityState(connectivity.Ready)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
} else {
if ac.state != connectivity.TransientFailure {
ac.updateConnectivityState(connectivity.TransientFailure)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
}
}

err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
Severity: channelz.CtError,
})
}
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
}
}

// nextAddr increments the addrIdx if there are more addresses to try. If
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
// increment the backoffIdx.
Expand Down Expand Up @@ -1279,21 +1369,23 @@ func (ac *addrConn) tearDown(err error) {
ac.mu.Unlock()
return
}
curTr := ac.transport
ac.transport = nil
// We have to set the state to Shutdown before anything else to prevent races
// between setting the state and logic that waits on context cancelation / etc.
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.tearDownErr = err
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
if err == errConnDrain && ac.transport != nil {
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac.mu.Unlock()
ac.transport.GracefulClose()
curTr.GracefulClose()
ac.mu.Lock()
}
if channelz.IsOn() {
Expand Down
9 changes: 9 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type dialOptions struct {
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -454,6 +455,14 @@ func WithMaxHeaderListSize(s uint32) DialOption {
})
}

// WithDisableHealthCheck disables the LB channel health checking for all SubConns of this ClientConn.
//
// This API is EXPERIMENTAL.
func WithDisableHealthCheck() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableHealthCheck = true
})
}
func defaultDialOptions() dialOptions {
return dialOptions{
disableRetry: !envconfig.Retry,
Expand Down
2 changes: 1 addition & 1 deletion rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (d *gzipDecompressor) Type() string {
type callInfo struct {
compressorType string
failFast bool
stream *clientStream
stream ClientStream
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
Expand Down
17 changes: 14 additions & 3 deletions service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type ServiceConfig struct {
// If token_count is less than or equal to maxTokens / 2, then RPCs will not
// be retried and hedged RPCs will not be sent.
retryThrottling *retryThrottlingPolicy
// healthCheckConfig must be set as one of the requirement to enable LB channel
// health check.
healthCheckConfig *healthCheckConfig
}

// healthCheckConfig defines the go-native version of the LB channel health check config.
type healthCheckConfig struct {
// serviceName is the service name to use in the health-checking request.
ServiceName string
}

// retryPolicy defines the go-native version of the retry policy defined by the
Expand Down Expand Up @@ -226,6 +235,7 @@ type jsonSC struct {
LoadBalancingPolicy *string
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}

func parseServiceConfig(js string) (ServiceConfig, error) {
Expand All @@ -239,9 +249,10 @@ func parseServiceConfig(js string) (ServiceConfig, error) {
return ServiceConfig{}, err
}
sc := ServiceConfig{
LB: rsc.LoadBalancingPolicy,
Methods: make(map[string]MethodConfig),
retryThrottling: rsc.RetryThrottling,
LB: rsc.LoadBalancingPolicy,
Methods: make(map[string]MethodConfig),
retryThrottling: rsc.RetryThrottling,
healthCheckConfig: rsc.HealthCheckConfig,
}
if rsc.MethodConfig == nil {
return sc, nil
Expand Down
Loading

0 comments on commit 105f614

Please sign in to comment.