Skip to content

Commit

Permalink
make downErr for Balancer down closure
Browse files Browse the repository at this point in the history
  • Loading branch information
iamqizhao committed May 25, 2016
1 parent 8eab9cb commit 9dc3da0
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 40 deletions.
21 changes: 21 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
package grpc

import (
"fmt"
"sync"

"golang.org/x/net/context"
Expand Down Expand Up @@ -103,6 +104,26 @@ type Balancer interface {
Close() error
}

// downErr implements net.Error. It is contructed by gRPC internals and passed to the down
// call of Balancer.
type downErr struct {
timeout bool
temporary bool
desc string
}

func (e downErr) Error() string { return e.desc }
func (e downErr) Timeout() bool { return e.timeout }
func (e downErr) Temporary() bool { return e.temporary }

func downErrorf(timeout, temporary bool, format string, a ...interface{}) downErr {
return downErr{
timeout: timeout,
temporary: temporary,
desc: fmt.Sprintf(format, a...),
}
}

// RoundRobin returns a Balancer that selects addresses round-robin. It starts to watch
// the name resolution updates.
func RoundRobin(r naming.Resolver) Balancer {
Expand Down
3 changes: 2 additions & 1 deletion call.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if err == ErrClientConnClosing {
return toRPCErr(err)
return Errorf(codes.FailedPrecondition, "%v", err)
}
if _, ok := err.(transport.StreamError); ok {
return toRPCErr(err)
Expand Down Expand Up @@ -189,6 +189,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok {
if c.failFast {
Expand Down
63 changes: 32 additions & 31 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,27 @@ import (
)

var (
// ErrNoTransportSecurity indicates that there is no transport security
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")

// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
ErrNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
// ErrCredentialsMisuse indicates that users want to transmit security information
errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
// errCredentialsMisuse indicates that users want to transmit security information
// (e.g., oauth2 token) which requires secure connection on an insecure
// connection.
ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
ErrClientConnClosing = Errorf(codes.FailedPrecondition, "grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the connection could not be
errCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
// errClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
// ErrNetworkIP indicates that the connection is down due to some network I/O error.
ErrNetworkIO = errors.New("grpc: failed with network I/O error")
// ErrConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
ErrConnDrain = errors.New("grpc: the connection is drained")
// ErrConnClosing
ErrConnClosing = errors.New("grpc: the addrConn is closing")
errClientConnTimeout = errors.New("grpc: timed out trying to connect")
// errNetworkIP indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
Expand Down Expand Up @@ -337,7 +338,7 @@ func (cc *ClientConn) controller() {
}
}
for _, c := range del {
c.tearDown(ErrConnDrain)
c.tearDown(errConnDrain)
}
}
}
Expand All @@ -360,12 +361,12 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
}
if !ok {
return ErrNoTransportSecurity
return errNoTransportSecurity
}
} else {
for _, cd := range ac.dopts.copts.AuthOptions {
if cd.RequireTransportSecurity() {
return ErrCredentialsMisuse
return errCredentialsMisuse
}
}
}
Expand Down Expand Up @@ -529,10 +530,10 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return ErrConnClosing
return errConnClosing
}
if ac.down != nil {
ac.down(ErrNetworkIO)
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting
Expand All @@ -545,14 +546,14 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
// Adjust timeout for the current try.
copts := ac.dopts.copts
if copts.Timeout < 0 {
ac.tearDown(ErrClientConnTimeout)
return ErrClientConnTimeout
ac.tearDown(errClientConnTimeout)
return errClientConnTimeout
}
if copts.Timeout > 0 {
copts.Timeout -= time.Since(start)
if copts.Timeout <= 0 {
ac.tearDown(ErrClientConnTimeout)
return ErrClientConnTimeout
ac.tearDown(errClientConnTimeout)
return errClientConnTimeout
}
}
sleepTime := ac.dopts.bs.backoff(retries)
Expand All @@ -570,7 +571,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return ErrConnClosing
return errConnClosing
}
ac.errorf("transient failure: %v", err)
ac.state = TransientFailure
Expand All @@ -589,8 +590,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.mu.Lock()
ac.errorf("connection timeout")
ac.mu.Unlock()
ac.tearDown(ErrClientConnTimeout)
return ErrClientConnTimeout
ac.tearDown(errClientConnTimeout)
return errClientConnTimeout
}
closeTransport = false
select {
Expand All @@ -607,7 +608,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return ErrConnClosing
return errConnClosing
}
ac.state = Ready
ac.stateCV.Broadcast()
Expand Down Expand Up @@ -662,7 +663,7 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
switch {
case ac.state == Shutdown:
ac.mu.Unlock()
return nil, ErrConnClosing
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
Expand Down Expand Up @@ -699,7 +700,7 @@ func (ac *addrConn) tearDown(err error) {
ac.cc.mu.Unlock()
}()
if ac.down != nil {
ac.down(err)
ac.down(downErrorf(false, false, "%v", err))
ac.down = nil
}
if ac.state == Shutdown {
Expand All @@ -716,7 +717,7 @@ func (ac *addrConn) tearDown(err error) {
ac.ready = nil
}
if ac.transport != nil {
if err == ErrConnDrain {
if err == errConnDrain {
ac.transport.GracefulClose()
} else {
ac.transport.Close()
Expand Down
16 changes: 8 additions & 8 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestDialTimeout(t *testing.T) {
if err == nil {
conn.Close()
}
if err != ErrClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
if err != errClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, errClientConnTimeout)
}
}

Expand All @@ -61,8 +61,8 @@ func TestTLSDialTimeout(t *testing.T) {
if err == nil {
conn.Close()
}
if err != ErrClientConnTimeout {
t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
if err != errClientConnTimeout {
t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, errClientConnTimeout)
}
}

Expand All @@ -72,12 +72,12 @@ func TestCredentialsMisuse(t *testing.T) {
t.Fatalf("Failed to create credentials %v", err)
}
// Two conflicting credential configurations
if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(creds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != ErrCredentialsMisuse {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, ErrCredentialsMisuse)
if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(creds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != errCredentialsMisuse {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsMisuse)
}
// security info on insecure connection
if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(creds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != ErrCredentialsMisuse {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, ErrCredentialsMisuse)
if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(creds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != errCredentialsMisuse {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsMisuse)
}
}

Expand Down

0 comments on commit 9dc3da0

Please sign in to comment.