Skip to content

Commit

Permalink
xds: Implement circuit breaking support. (grpc#4050)
Browse files Browse the repository at this point in the history
  • Loading branch information
GarrettGutierrez1 authored Dec 8, 2020
1 parent 750abe8 commit ff16195
Show file tree
Hide file tree
Showing 12 changed files with 518 additions and 10 deletions.
3 changes: 3 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"

xdsclient "google.golang.org/grpc/xds/internal/client"
Expand Down Expand Up @@ -328,6 +329,8 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
return
}

client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests)

// The first good update from the watch API leads to the instantiation of an
// edsBalancer. Further updates/errors are propagated to the existing
// edsBalancer.
Expand Down
37 changes: 37 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
Expand Down Expand Up @@ -574,6 +575,42 @@ func (s) TestUpdateSubConnState(t *testing.T) {
}
}

// TestCircuitBreaking verifies that the CDS balancer correctly updates a
// service's counter on watch updates.
func (s) TestCircuitBreaking(t *testing.T) {
// This creates a CDS balancer, pushes a ClientConnState update with a fake
// xdsClient, and makes sure that the CDS balancer registers a watch on the
// provided xdsClient.
xdsC, cdsB, edsB, _, cancel := setupWithXDSCreds(t)
defer func() {
cancel()
cdsB.Close()
}()

// Here we invoke the watch callback registered on the fake xdsClient. This
// will trigger the watch handler on the CDS balancer, which will update
// the service's counter with the new max requests.
var maxRequests uint32 = 1
cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName, MaxRequests: &maxRequests}
wantCCS := edsCCS(serviceName, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}

// Since the counter's max requests was set to 1, the first request should
// succeed and the second should fail.
counter := client.GetServiceRequestsCounter(serviceName)
if err := counter.StartRequest(); err != nil {
t.Fatal(err)
}
if err := counter.StartRequest(); err == nil {
t.Fatal("unexpected success on start request over max")
}
counter.EndRequest()
}

// TestClose verifies the Close() method in the the CDS balancer.
func (s) TestClose(t *testing.T) {
// This creates a CDS balancer, pushes a ClientConnState update with a fake
Expand Down
5 changes: 5 additions & 0 deletions xds/internal/balancer/edsbalancer/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type edsBalancerImplInterface interface {
handleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
// updateState handle a balancer state update from the priority.
updateState(priority priorityType, s balancer.State)
// updateServiceRequestsCounter updates the service requests counter to the
// one for the given service name.
updateServiceRequestsCounter(serviceName string)
// close closes the eds balancer.
close()
}
Expand Down Expand Up @@ -212,6 +215,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
x.logger.Warningf("failed to update xDS client: %v", err)
}

x.edsImpl.updateServiceRequestsCounter(cfg.EDSServiceName)

// We will update the edsImpl with the new child policy, if we got a
// different one.
if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy, cmpopts.EquateEmpty()) {
Expand Down
47 changes: 40 additions & 7 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/env"
)

// TODO: make this a environment variable?
Expand Down Expand Up @@ -92,10 +94,11 @@ type edsBalancerImpl struct {
subConnMu sync.Mutex
subConnToPriority map[balancer.SubConn]priorityType

pickerMu sync.Mutex
dropConfig []xdsclient.OverloadDropConfig
drops []*dropper
innerState balancer.State // The state of the picker without drop support.
pickerMu sync.Mutex
dropConfig []xdsclient.OverloadDropConfig
drops []*dropper
innerState balancer.State // The state of the picker without drop support.
serviceRequestsCounter *client.ServiceRequestsCounter
}

// newEDSBalancerImpl create a new edsBalancerImpl.
Expand Down Expand Up @@ -170,7 +173,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC
// Update picker with old inner picker, new drops.
edsImpl.cc.UpdateState(balancer.State{
ConnectivityState: edsImpl.innerState.ConnectivityState,
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadReporter)},
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter)},
)
}
edsImpl.pickerMu.Unlock()
Expand Down Expand Up @@ -389,6 +392,16 @@ func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s
}
}

// updateConfig handles changes to the circuit breaking configuration.
func (edsImpl *edsBalancerImpl) updateServiceRequestsCounter(serviceName string) {
if !env.CircuitBreakingSupport {
return
}
if edsImpl.serviceRequestsCounter == nil || edsImpl.serviceRequestsCounter.ServiceName != serviceName {
edsImpl.serviceRequestsCounter = client.GetServiceRequestsCounter(serviceName)
}
}

// updateState first handles priority, and then wraps picker in a drop picker
// before forwarding the update.
func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) {
Expand All @@ -403,7 +416,7 @@ func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.St
defer edsImpl.pickerMu.Unlock()
edsImpl.innerState = s
// Don't reset drops when it's a state change.
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadReporter)})
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter)})
}
}

Expand Down Expand Up @@ -455,13 +468,15 @@ type dropPicker struct {
drops []*dropper
p balancer.Picker
loadStore load.PerClusterReporter
counter *client.ServiceRequestsCounter
}

func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker {
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter, counter *client.ServiceRequestsCounter) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
loadStore: loadStore,
counter: counter,
}
}

Expand All @@ -483,6 +498,24 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
}
if d.counter != nil {
if err := d.counter.StartRequest(); err != nil {
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
pr, err := d.p.Pick(info)
if err != nil {
d.counter.EndRequest()
return pr, err
}
oldDone := pr.Done
pr.Done = func(doneInfo balancer.DoneInfo) {
d.counter.EndRequest()
if oldDone != nil {
oldDone(doneInfo)
}
}
return pr, err
}
// TODO: (eds) don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
return d.p.Pick(info)
Expand Down
65 changes: 64 additions & 1 deletion xds/internal/balancer/edsbalancer/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/env"
"google.golang.org/grpc/xds/internal/testutils"
)

Expand Down Expand Up @@ -550,6 +552,67 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
}
}

func (s) TestEDS_CircuitBreaking(t *testing.T) {
origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()

cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb.updateServiceRequestsCounter("test")
var maxRequests uint32 = 50
client.SetMaxRequests("test", &maxRequests)

// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)

// Picks with drops.
dones := []func(){}
p := <-cc.NewPickerCh
for i := 0; i < 100; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if i < 50 && err != nil {
t.Errorf("The first 50%% picks should be non-drops, got error %v", err)
} else if i > 50 && err == nil {
t.Errorf("The second 50%% picks should be drops, got error <nil>")
}
dones = append(dones, func() {
if pr.Done != nil {
pr.Done(balancer.DoneInfo{})
}
})
}

for _, done := range dones {
done()
}
dones = []func(){}

// Pick without drops.
for i := 0; i < 50; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if err != nil {
t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
dones = append(dones, func() {
if pr.Done != nil {
pr.Done(balancer.DoneInfo{})
}
})
}

// Without this, future tests with the same service name will fail.
for _, done := range dones {
done()
}
}

func init() {
balancer.Register(&testInlineUpdateBalancerBuilder{})
}
Expand Down Expand Up @@ -656,7 +719,7 @@ func (s) TestDropPicker(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

p := newDropPicker(constPicker, tt.drops, nil)
p := newDropPicker(constPicker, tt.drops, nil, nil)

// scCount is the number of sc's returned by pick. The opposite of
// drop-count.
Expand Down
53 changes: 53 additions & 0 deletions xds/internal/balancer/edsbalancer/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type fakeEDSBalancer struct {
childPolicy *testutils.Channel
subconnStateChange *testutils.Channel
edsUpdate *testutils.Channel
serviceName *testutils.Channel
}

func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
Expand All @@ -131,6 +132,10 @@ func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {

func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}

func (f *fakeEDSBalancer) updateServiceRequestsCounter(serviceName string) {
f.serviceName.Send(serviceName)
}

func (f *fakeEDSBalancer) close() {}

func (f *fakeEDSBalancer) waitForChildPolicy(ctx context.Context, wantPolicy *loadBalancingConfig) error {
Expand Down Expand Up @@ -169,12 +174,25 @@ func (f *fakeEDSBalancer) waitForEDSResponse(ctx context.Context, wantUpdate xds
return nil
}

func (f *fakeEDSBalancer) waitForCounterUpdate(ctx context.Context, wantServiceName string) error {
val, err := f.serviceName.Receive(ctx)
if err != nil {
return err
}
gotServiceName := val.(string)
if gotServiceName != wantServiceName {
return fmt.Errorf("got serviceName %v, want %v", gotServiceName, wantServiceName)
}
return nil
}

func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface {
return &fakeEDSBalancer{
cc: cc,
childPolicy: testutils.NewChannelWithSize(10),
subconnStateChange: testutils.NewChannelWithSize(10),
edsUpdate: testutils.NewChannelWithSize(10),
serviceName: testutils.NewChannelWithSize(10),
}
}

Expand Down Expand Up @@ -307,6 +325,9 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) {
if err := edsLB.waitForChildPolicy(ctx, lbCfgA); err != nil {
t.Fatal(err)
}
if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil {
t.Fatal(err)
}

lbCfgB := &loadBalancingConfig{
Name: fakeBalancerB,
Expand All @@ -323,6 +344,11 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) {
if err := edsLB.waitForChildPolicy(ctx, lbCfgB); err != nil {
t.Fatal(err)
}
if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil {
// Counter is updated even though the service name didn't change. The
// eds_impl will compare the service names, and skip if it didn't change.
t.Fatal(err)
}
}

// TestSubConnStateChange verifies if the top-level edsBalancer passes on
Expand Down Expand Up @@ -566,6 +592,33 @@ func (s) TestClientWatchEDS(t *testing.T) {
}
}

// TestCounterUpdate verifies that the counter update is triggered with the
// service name from an update's config.
func (s) TestCounterUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
_, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()

// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"},
}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer).waitForCounterUpdate(ctx, "foobar-1"); err != nil {
t.Fatal(err)
}
}

func (s) TestBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"
Expand Down
2 changes: 2 additions & 0 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type ClusterUpdate struct {
EnableLRS bool
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxRequests for circuit breaking, if any (otherwise nil).
MaxRequests *uint32
}

// OverloadDropConfig contains the config to drop overloads.
Expand Down
Loading

0 comments on commit ff16195

Please sign in to comment.