Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: add V2Picker, ClientConn.UpdateState, SubConnState.ConnectionError #3186

Merged
merged 16 commits into from
Nov 21, 2019
Prev Previous commit
Next Next commit
pickfirst: update to V2 API; return error on empty address list
  • Loading branch information
dfawley committed Nov 15, 2019
commit 9d1ddfe8c8fa695035908b5927b4ca92854db5bc
4 changes: 1 addition & 3 deletions balancer_conn_wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) {
func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return b.updateClientConnState(s)
}
func (*funcBalancer) ResolverError(error) {
panic("unimplemented") // resolver never reports error
}
func (*funcBalancer) ResolverError(error) {}
func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("unimplemented") // we never have sub-conns
}
Expand Down
45 changes: 32 additions & 13 deletions pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,50 @@ type pickfirstBalancer struct {
sc balancer.SubConn
}

var _ balancer.V2Balancer = &pickfirstBalancer{} // Assert we implement v2

func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
if grpclog.V(2) {
grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
}
b.ResolverError(err)
return
}
b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) // Ignore error
}

func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s})
}

func (b *pickfirstBalancer) ResolverError(err error) {
if grpclog.V(2) {
grpclog.Infof("pickfirstBalancer: ResolverError called with error %v", err)
}
}

func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
if len(cs.ResolverState.Addresses) == 0 {
return balancer.ErrBadResolverState
}

if b.sc == nil {
b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
var err error
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
//TODO(yuxuanli): why not change the cc state to Idle?
if grpclog.V(2) {
grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
return
return balancer.ErrBadResolverState
}
b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc})
b.sc.Connect()
} else {
b.sc.UpdateAddresses(addrs)
b.sc.UpdateAddresses(cs.ResolverState.Addresses)
b.sc.Connect()
}
return nil
}

func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if grpclog.V(2) {
grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
}
Expand All @@ -83,18 +102,18 @@ func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s conn
}
return
}
if s == connectivity.Shutdown {
if s.ConnectivityState == connectivity.Shutdown {
b.sc = nil
return
}

switch s {
switch s.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
b.cc.UpdateBalancerState(s, &picker{sc: sc})
b.cc.UpdateBalancerState(s.ConnectivityState, &picker{sc: sc})
case connectivity.Connecting:
b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrNoSubConnAvailable})
b.cc.UpdateBalancerState(s.ConnectivityState, &picker{err: balancer.ErrNoSubConnAvailable})
case connectivity.TransientFailure:
b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrTransientFailure})
b.cc.UpdateBalancerState(s.ConnectivityState, &picker{err: balancer.ErrTransientFailure})
}
}

Expand Down
20 changes: 18 additions & 2 deletions resolver_conn_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -178,6 +179,19 @@ func testResolverErrorPolling(t *testing.T, badUpdate func(*manual.Resolver), go
}
}

const happyBalancerName = "happy balancer"

func init() {
// Register a balancer that never returns an error from
// UpdateClientConnState, and doesn't do anything else either.
fb := &funcBalancer{
updateClientConnState: func(s balancer.ClientConnState) error {
return nil
},
}
balancer.Register(&funcBalancerBuilder{name: happyBalancerName, instance: fb})
}

// TestResolverErrorPolling injects resolver errors and verifies ResolveNow is
// called with the appropriate backoff strategy being consulted between
// ResolveNow calls.
Expand All @@ -188,7 +202,8 @@ func (s) TestResolverErrorPolling(t *testing.T) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine.
go r.CC.UpdateState(resolver.State{})
})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName)))
}

// TestServiceConfigErrorPolling injects a service config error and verifies
Expand All @@ -202,7 +217,8 @@ func (s) TestServiceConfigErrorPolling(t *testing.T) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine.
go r.CC.UpdateState(resolver.State{})
})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName)))
}

// TestResolverErrorInBuild makes the resolver.Builder call into the ClientConn
Expand Down