Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: add V2Picker, ClientConn.UpdateState, SubConnState.ConnectionError #3186

Merged
merged 16 commits into from
Nov 21, 2019
Prev Previous commit
Next Next commit
review changes
  • Loading branch information
dfawley committed Nov 20, 2019
commit ad3e0dcaf4044b96efc3dc65dbca502fa5bec47f
49 changes: 24 additions & 25 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import (
"google.golang.org/grpc/status"
)

// v2PickerWrapper wraps a balancer.Picker while providing the
// balancer.V2Picker API. It requires a pickerWrapper to generate errors
// including the latest connectionError. To be deleted when balancer.Picker is
// updated to the balancer.V2Picker API.
type v2PickerWrapper struct {
picker balancer.Picker
pw *pickerWrapper // for accessing the latest connection error
picker balancer.Picker
connErr *connErr
}

func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
sc, done, err := v.picker.Pick(info.Ctx, info)
if err != nil {
if err == balancer.ErrTransientFailure {
return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.pw.connectionError()))
return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
}
return balancer.PickResult{}, err
}
Expand All @@ -58,39 +62,34 @@ type pickerWrapper struct {

// The latest connection error. TODO: remove when V1 picker is deprecated;
// balancer should be responsible for providing the error.
connErrMu sync.Mutex
connErr error
*connErr
}

func newPickerWrapper() *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{})}
type connErr struct {
mu sync.Mutex
err error
}

func (pw *pickerWrapper) updateConnectionError(err error) {
pw.connErrMu.Lock()
pw.connErr = err
pw.connErrMu.Unlock()
func (c *connErr) updateConnectionError(err error) {
c.mu.Lock()
c.err = err
c.mu.Unlock()
}

func (pw *pickerWrapper) connectionError() error {
pw.connErrMu.Lock()
err := pw.connErr
pw.connErrMu.Unlock()
func (c *connErr) connectionError() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}

func newPickerWrapper() *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}}
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
return
}
pw.picker = &v2PickerWrapper{picker: p, pw: pw}
// pw.blockingCh should never be nil.
close(pw.blockingCh)
pw.blockingCh = make(chan struct{})
pw.mu.Unlock()
pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Expand Down