Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New grpclb implementation #1558

Merged
merged 9 commits into from
Nov 27, 2017
Merged

New grpclb implementation #1558

merged 9 commits into from
Nov 27, 2017

Conversation

menghanl
Copy link
Contributor

@menghanl menghanl commented Oct 3, 2017

Also add new feature:

  • Fallback to backends if remote balancer is unavailable

$new-bar-2$

@@ -128,6 +128,10 @@ type PickOptions struct{}
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
// BytesSent indicates if any byte has been sent to the server.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*"bytes have"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

call.go Outdated
@@ -251,13 +251,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
stream, err = t.NewStream(ctx, callHdr)
if err != nil {
if done != nil {
doneInfo := balancer.DoneInfo{Err: err}
if _, ok := err.(transport.ConnectionError); ok {
// If error is connection error, transport was sending data on wire,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not true (any longer?).

NewStream only returns errors before it attempts to write to the network. I.e. this block should be deleted (and is by #1597).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping? Bytes not sent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebased on top of the retry PR. This was removed.

call.go Outdated
done(balancer.DoneInfo{Err: err})
doneInfo := balancer.DoneInfo{
Err: err,
BytesSent: stream.BytesSent(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, if there is no error from NewStream, we always attempted to write bytes to the wire. So this should be "true". (And below.)

grpclb_picker.go Outdated
NumCallsFinishedKnownReceived int64
}

// toClientStats converts rpcStats to lbpb.ClientStats, and clear rpcStats.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*clearS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb_picker.go Outdated

mu sync.Mutex
serverList []*lbpb.Server
nextSL int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is nextSL and nextSC? Please document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
if l == nil {
// regeneratePicker takes a snapshot of the balancer, and generate a picker from
// it. The picker
// - always return ErrTransientFailure if the balancer is in TransientFailure,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*returnS
and doES below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
timer := time.NewTimer(fallbackTimeout)
defer func() {
if !timer.Stop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, timer is a local, so you don't even have to stop it if you don't want. You definitely don't have to check its return value and drain its channel. That's only if you want to reuse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
// Has iterated all the possible address but none is connected.
break
}
var (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var remoteBalancerAddrs, backendAddrs []resolver.Address

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
b.mu.Unlock()
if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
grpclog.Fatalf("grpclb: no remote balancer address is available, should never happen")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return an error instead anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

We don't check this returned error, though...

}
if b.w != nil {
b.w.Close()
close(lb.doneCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still racy - is that OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close() is guaranteed to be called from one goroutine (never in parallel).
So this should be OK.

Copy link
Contributor Author

@menghanl menghanl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. PTAL.

@@ -128,6 +128,10 @@ type PickOptions struct{}
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
// BytesSent indicates if any byte has been sent to the server.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
}
// NewLBBuilder creates a builder for grpclb.
func NewLBBuilder() balancer.Builder {
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
// fallbackTimeout.
//
// Only call this function when a non-default fallback timeout is needed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be used to override the registered grpclb builder with a new fallback timeout.
Updated the comment.

grpclb.go Outdated
}

func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
if l == nil {
// regeneratePicker takes a snapshot of the balancer, and generate a picker from
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclb.go Outdated
if l == nil {
// regeneratePicker takes a snapshot of the balancer, and generate a picker from
// it. The picker
// - always return ErrTransientFailure if the balancer is in TransientFailure,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

continue
}
if initResp.LoadBalancerDelegate != "" {
grpclog.Fatalf("TODO: Delegation is not supported yet.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Errorf and continue.

}

func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

grpclog.Fatalf("TODO: Delegation is not supported yet.")
}

// streamDone will be closed by the readServerList goroutine when
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, right, context.
There's no need to create a new context here.
SendLoadReport() should block on the stream's context.

// time.Ticker forever.
streamDone := make(chan struct{})

var wg sync.WaitGroup
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

dopts = append(dopts, WithInsecure())
}
if lb.opt.Dialer != nil {
// WithDialer takes a different type of function, so we instead use a special DialOption here.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.
Added a unexported withContextDialer()

call.go Outdated
@@ -251,13 +251,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
stream, err = t.NewStream(ctx, callHdr)
if err != nil {
if done != nil {
doneInfo := balancer.DoneInfo{Err: err}
if _, ok := err.(transport.ConnectionError); ok {
// If error is connection error, transport was sending data on wire,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping? Bytes not sent.

grpclb.go Outdated
}
// NewLBBuilder creates a builder for grpclb.
func NewLBBuilder() balancer.Builder {
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done?

grpclb.go Outdated
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
// fallbackTimeout.
//
// Only call this function when a non-default fallback timeout is needed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a fallback timeout? OK, I think I know what it means, but can you explain in the comment?

grpclb.go Outdated
// The ClientConn to talk to the remote balancer.
ccRemoteLB *ClientConn

mu sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this guard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything following until the blank line...

I move the code around and added a comment.

grpclb.go Outdated
}

func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
if l == nil {
// regeneratePicker takes a snapshot of the balancer, and generate a picker from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done? Maybe your latest updates aren't pushed?

Copy link
Contributor Author

@menghanl menghanl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. PTAL.

The "Bytes not sent" was about the byteSent boolean variable you removed in your retry change.
I rebased this PR on the retry commit, so it's fixed now.

grpclb.go Outdated
}

func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
if l == nil {
// regeneratePicker takes a snapshot of the balancer, and generate a picker from
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done again... And also the following one.

grpclb.go Outdated
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
// fallbackTimeout.
//
// Only call this function when a non-default fallback timeout is needed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

grpclb.go Outdated
}
// NewLBBuilder creates a builder for grpclb.
func NewLBBuilder() balancer.Builder {
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done again...

grpclb.go Outdated
// The ClientConn to talk to the remote balancer.
ccRemoteLB *ClientConn

mu sync.Mutex
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything following until the blank line...

I move the code around and added a comment.

split lbPicker into lbPicker and errPicker
grpclb_remote_balancer.go go style and supress error logs
add rrpicker
for {
select {
case <-lb.doneCh:
return
default:
if remoteBalancerErr != nil {
grpclog.Error(remoteBalancerErr)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteBalancerErr = nil?

@menghanl menghanl merged commit 2ef021f into grpc:master Nov 27, 2017
@menghanl menghanl deleted the bar_new_grpclb branch November 27, 2017 21:35
@menghanl menghanl mentioned this pull request Nov 29, 2017
8 tasks
@lock lock bot locked as resolved and limited conversation to collaborators Jan 18, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants