Skip to content

Commit

Permalink
grpc: Move Pick First Balancer to separate package (#7255)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored May 22, 2024
1 parent 1adbea2 commit 1db6590
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 32 deletions.
4 changes: 2 additions & 2 deletions balancer/grpclb/grpclb_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ package grpclb
import (
"encoding/json"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/serviceconfig"
)

const (
roundRobinName = roundrobin.Name
pickFirstName = grpc.PickFirstBalancerName
pickFirstName = pickfirst.Name
)

type grpclbServiceConfig struct {
Expand Down
18 changes: 13 additions & 5 deletions pickfirst.go → balancer/pickfirst/pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*
*/

package grpc
// Package pickfirst contains the pick_first load balancing policy.
package pickfirst

import (
"encoding/json"
Expand All @@ -25,17 +26,24 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

func init() {
balancer.Register(pickfirstBuilder{})
}

var logger = grpclog.Component("pick-first-lb")

const (
// PickFirstBalancerName is the name of the pick_first balancer.
PickFirstBalancerName = "pick_first"
logPrefix = "[pick-first-lb %p] "
// Name is the name of the pick_first balancer.
Name = "pick_first"
logPrefix = "[pick-first-lb %p] "
)

type pickfirstBuilder struct{}
Expand All @@ -47,7 +55,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
}

func (pickfirstBuilder) Name() string {
return PickFirstBalancerName
return Name
}

type pfConfig struct {
Expand Down
6 changes: 4 additions & 2 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
Expand All @@ -46,6 +46,8 @@ import (
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/testdata"

rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand Down Expand Up @@ -919,7 +921,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
}
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions)
},
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &childPolicyConfig{}
Expand Down
4 changes: 2 additions & 2 deletions balancer/rls/internal/test/e2e/rls_child_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"errors"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -68,7 +68,7 @@ type bb struct {
func (bb bb) Name() string { return bb.name }

func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
pf := balancer.Get(grpc.PickFirstBalancerName)
pf := balancer.Get(pickfirst.Name)
b := &bal{
Balancer: pf.Build(cc, opts),
bf: bb.bf,
Expand Down
1 change: 0 additions & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,6 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
var emptyServiceConfig *ServiceConfig

func init() {
balancer.Register(pickfirstBuilder{})
cfg := parseServiceConfig("{}")
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
Expand Down
4 changes: 2 additions & 2 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -602,7 +602,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
ccs.ResolverState.Addresses = ccs.ResolverState.Addresses[1:]
Expand Down
5 changes: 3 additions & 2 deletions service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
Expand Down Expand Up @@ -183,12 +184,12 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
}
c := rsc.LoadBalancingConfig
if c == nil {
name := PickFirstBalancerName
name := pickfirst.Name
if rsc.LoadBalancingPolicy != nil {
name = *rsc.LoadBalancingPolicy
}
if balancer.Get(name) == nil {
name = PickFirstBalancerName
name = pickfirst.Name
}
cfg := []map[string]any{{name: struct{}{}}}
strCfg, err := json.Marshal(cfg)
Expand Down
19 changes: 10 additions & 9 deletions test/balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
pickfirst "google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/fakegrpclb"
"google.golang.org/grpc/internal/testutils/pickfirst"
pfutil "google.golang.org/grpc/internal/testutils/pickfirst"
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (s) TestBalancerSwitch_Basic(t *testing.T) {
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

Expand All @@ -146,7 +147,7 @@ func (s) TestBalancerSwitch_Basic(t *testing.T) {
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -195,7 +196,7 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// newly configured backends, as part of the balancer switch.
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}
Expand All @@ -220,7 +221,7 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
r.UpdateState(resolver.State{Addresses: addrs[1:]})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}

Expand All @@ -245,7 +246,7 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
// Switch to "pick_first" again by sending no grpclb server addresses.
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -340,7 +341,7 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: grpclbAddr}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -468,7 +469,7 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) {
waitToProceed := make(chan struct{})
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
pf := balancer.Get(grpc.PickFirstBalancerName)
pf := balancer.Get(pickfirst.Name)
bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
Expand Down Expand Up @@ -503,7 +504,7 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) {
// underlying "pick_first" balancer which will result in a healthy picker
// being reported to the channel. RPCs should start using the new balancer.
close(waitToProceed)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
if err := pfutil.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
}
3 changes: 2 additions & 1 deletion test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -847,7 +848,7 @@ func (s) TestMetadataInPickResult(t *testing.T) {
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
cc := &testCCWrapper{ClientConn: bd.ClientConn}
bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(cc, bd.BuildOptions)
bd.Data = balancer.Get(pickfirst.Name).Build(cc, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
Expand Down
3 changes: 2 additions & 1 deletion test/resolver_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
ccUpdateCh := testutils.NewChannel()
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
pf := balancer.Get(grpc.PickFirstBalancerName)
pf := balancer.Get(pickfirst.Name)
bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
},
ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -181,7 +182,7 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
// Create a wrapped pickfirst LB policy. When the endpoint picking policy on
// the cluster resource is changed to pickfirst, this will allow us to
// verify that load balancing configuration is pushed to it.
pfBuilder := balancer.Get(grpc.PickFirstBalancerName)
pfBuilder := balancer.Get(pickfirst.Name)
internal.BalancerUnregister(pfBuilder.Name())

lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1)
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdslbregistry/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"fmt"
"strings"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/leastrequest"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -110,7 +110,7 @@ func convertPickFirstProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessa
if err != nil {
return nil, fmt.Errorf("error marshaling JSON for type %T: %v", pfCfg, err)
}
return makeBalancerConfigJSON(grpc.PickFirstBalancerName, js), nil
return makeBalancerConfigJSON(pickfirst.Name, js), nil
}

func convertRoundRobinProtoToServiceConfig([]byte, int) (json.RawMessage, error) {
Expand Down

0 comments on commit 1db6590

Please sign in to comment.