Skip to content

Commit

Permalink
reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 26, 2017
1 parent e5af9ad commit 23c3be9
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 76 deletions.
2 changes: 1 addition & 1 deletion balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type PickOptions struct{}
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
// BytesSent indicates if any byte has been sent to the server.
// BytesSent indicates if any bytes have been sent to the server.
BytesSent bool
// BytesReceived indicates if any byte has been received from the server.
BytesReceived bool
Expand Down
13 changes: 9 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,18 +270,23 @@ func WithTimeout(d time.Duration) DialOption {
}
}

func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = f
}
}

// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
return withContextDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
if deadline, ok := ctx.Deadline(); ok {
return f(addr, deadline.Sub(time.Now()))
}
return f(addr, 0)
}
}
})
}

// WithStatsHandler returns a DialOption that specifies the stats handler
Expand Down
17 changes: 4 additions & 13 deletions grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,7 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
// if no connection to remote balancers was successful.
func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
timer := time.NewTimer(fallbackTimeout)
defer func() {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}()
defer timer.Stop()
select {
case <-timer.C:
case <-lb.doneCh:
Expand All @@ -280,10 +273,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
return
}

var (
remoteBalancerAddrs []resolver.Address
backendAddrs []resolver.Address
)
var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
remoteBalancerAddrs = append(remoteBalancerAddrs, a)
Expand All @@ -294,7 +284,8 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {

if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
grpclog.Fatalf("grpclb: no remote balancer address is available, should never happen")
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return
}
// First time receiving resolved addresses, create a cc to remote
// balancers.
Expand Down
43 changes: 20 additions & 23 deletions grpclb_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
)

Expand All @@ -39,7 +38,7 @@ type rpcStats struct {
NumCallsFinishedKnownReceived int64
}

// toClientStats converts rpcStats to lbpb.ClientStats, and clear rpcStats.
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{
NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0),
Expand Down Expand Up @@ -80,11 +79,11 @@ type lbPicker struct {
// If err is not nil, Pick always returns this err.
err error

mu sync.Mutex
serverList []*lbpb.Server
nextSL int
readySCs []balancer.SubConn
nextSC int
mu sync.Mutex
serverList []*lbpb.Server
serverListNext int
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int

stats *rpcStats
}
Expand All @@ -95,17 +94,19 @@ type lbPicker struct {
// - If it picks a drop, the RPC will fail as being dropped.
// - If it picks a backend, do a second layer pick to pick the real backend.
//
// Second layer: roundrobin on all backends.
// Second layer: roundrobin on all READY backends.
func newPicker(err error, serverList []*lbpb.Server, readySCs []balancer.SubConn, stats *rpcStats) *lbPicker {
grpclog.Infof("grpclb: newPicker called with: %v, %v, %v", err, serverList, readySCs)
if err != nil {
return &lbPicker{
err: err,
}
return &lbPicker{err: err}
}

if len(serverList) <= 0 && len(readySCs) <= 0 {
return &lbPicker{err: balancer.ErrNoSubConnAvailable}
}

return &lbPicker{
serverList: serverList,
readySCs: readySCs,
subConns: readySCs,
stats: stats,
}
}
Expand All @@ -115,10 +116,6 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn ba
return nil, nil, p.err
}

if len(p.serverList) <= 0 && len(p.readySCs) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}

p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -136,8 +133,8 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn ba
}
}()

s := p.serverList[p.nextSL]
p.nextSL = (p.nextSL + 1) % len(p.serverList)
s := p.serverList[p.serverListNext]
p.serverListNext = (p.serverListNext + 1) % len(p.serverList)

// If it's a drop, return an error and fail the RPC.
if s.DropForRateLimiting {
Expand All @@ -150,11 +147,11 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn ba
}
}

// Else, do roundrobin on readySCs.
if len(p.readySCs) <= 0 {
// Roundrobin on readySCs.
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
sc := p.readySCs[p.nextSC]
p.nextSC = (p.nextSC + 1) % len(p.readySCs)
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
return sc, nil, nil
}
54 changes: 19 additions & 35 deletions grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"net"
"reflect"
"sync"
"time"

"golang.org/x/net/context"
Expand All @@ -34,21 +33,18 @@ import (
"google.golang.org/grpc/resolver"
)

// processServerList recalculates the drop rate and send the backend server
// addresses to roundrobin to regenerate the roundrobin picker.
//
// - If the new server list == old server list, do nothing and return.
// - Else if the new backends == old backends, generate a new grpclb picker
// with the new server list and the old RR picker.
// - Else, send updates to RR, and don't update grpclb picker. The grpclb
// picker will be updated when the RR updates the RR picker.
// processServerList updates balaner's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
// TODO(bargrpclb) see the comment.
grpclog.Infof("lbBalancer: processing server list: %+v", l)
lb.mu.Lock()
defer lb.mu.Unlock()

// Set serverListReceived to true so fallback will not take effect if it has
// not hit timeout.
lb.serverListReceived = true

// If the new server list == old server list, do nothing.
if reflect.DeepEqual(lb.fullServerList, l.Servers) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
return
Expand Down Expand Up @@ -77,6 +73,7 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
backendAddrs = append(backendAddrs, addr)
}

// Call refreshSubConns to create/remove SubConns.
backendsUpdated := lb.refreshSubConns(backendAddrs)
// If no backend was updated, no SubConn will be newed/removed. But since
// the full serverList was different, there might be updates in drops or
Expand All @@ -90,7 +87,7 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {

// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether SubConn was newed/removed).
// backendAddrs (whether any SubConn was newed/removed).
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
lb.backendAddrs = nil
Expand Down Expand Up @@ -135,8 +132,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
return backendsUpdated
}

func (lb *lbBalancer) readServerList(s *balanceLoadClientStream, done chan<- struct{}) {
defer close(done)
func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) {
for {
reply, err := s.Recv()
if err != nil {
Expand All @@ -149,13 +145,13 @@ func (lb *lbBalancer) readServerList(s *balanceLoadClientStream, done chan<- str
}
}

func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-done:
case <-s.Context().Done():
return
}
stats := lb.clientStats.toClientStats()
Expand Down Expand Up @@ -215,34 +211,21 @@ func (lb *lbBalancer) watchRemoteBalancer() {
continue
}
if initResp.LoadBalancerDelegate != "" {
grpclog.Fatalf("TODO: Delegation is not supported yet.")
grpclog.Errorf("Delegation is not supported.")
continue
}

// streamDone will be closed by the readServerList goroutine when
// there's an error. So the sendLoadReport goroutine won't block on
// time.Ticker forever.
streamDone := make(chan struct{})

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
lb.readServerList(stream, streamDone)
}()
go func() {
defer wg.Done()
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d, streamDone)
lb.sendLoadReport(stream, d)
}
}()
wg.Wait()
lb.readServerList(stream)
}
}

func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var (
dopts []DialOption
)
var dopts []DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, WithTransportCredentials(creds))
Expand All @@ -254,8 +237,9 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
dopts = append(dopts, WithInsecure())
}
if lb.opt.Dialer != nil {
// WithDialer takes a different type of function, so we instead use a special DialOption here.
dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = lb.opt.Dialer })
// WithDialer takes a different type of function, so we instead use a
// special DialOption here.
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder()))
Expand Down

0 comments on commit 23c3be9

Please sign in to comment.