Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serverv2): integrate gRPC #21038

Merged
merged 14 commits into from
Jul 30, 2024
Next Next commit
refactor grpc query handlers
  • Loading branch information
testinginprod committed Jul 23, 2024
commit 3c0ddc0b516fe905d313176d7bf54522e831b32a
8 changes: 4 additions & 4 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ type App[T transaction.Tx] struct {
amino legacy.Amino
moduleManager *MM[T]

// GRPCQueryDecoders maps gRPC method name to a function that decodes the request
// GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
GRPCMethodsToMessageMap map[string]func() gogoproto.Message
}

// Name returns the app name.
Expand Down Expand Up @@ -120,6 +120,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}

func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) {
return a.GRPCQueryDecoders
func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
return a.GRPCMethodsToMessageMap
}
11 changes: 5 additions & 6 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (m *MM[T]) assertNoForgottenModules(

func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){},
grpcQueryDecoders: map[string]func() gogoproto.Message{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
Expand All @@ -567,7 +567,7 @@ func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], re
if err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
app.GRPCQueryDecoders = c.grpcQueryDecoders
app.GRPCMethodsToMessageMap = c.grpcQueryDecoders
return nil
}

Expand All @@ -576,7 +576,7 @@ var _ grpc.ServiceRegistrar = (*configurator)(nil)
type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error)
grpcQueryDecoders map[string]func() gogoproto.Message

stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
Expand Down Expand Up @@ -618,9 +618,8 @@ func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func(bytes []byte) (gogoproto.Message, error) {
msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message)
return msg, gogoproto.Unmarshal(bytes, msg)
decoderFunc := func() gogoproto.Message {
return reflect.New(typ.Elem()).Interface().(gogoproto.Message)
}
c.grpcQueryDecoders[md.MethodName] = decoderFunc
}
Expand Down
12 changes: 7 additions & 5 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Consensus[T transaction.Tx] struct {
streaming streaming.Manager
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) // legacy support for gRPC

cfg Config
indexedEvents map[string]struct{}
Expand All @@ -60,6 +59,8 @@ type Consensus[T transaction.Tx] struct {

addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

grpcMethodsMap map[string]func() gogoproto.Message // maps gRPC method to message creator func
}

func NewConsensus[T transaction.Tx](
Expand All @@ -69,7 +70,7 @@ func NewConsensus[T transaction.Tx](
app *appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
gRPCMethodsMap map[string]func() gogoproto.Message,
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
Expand All @@ -78,7 +79,7 @@ func NewConsensus[T transaction.Tx](
appName: appName,
version: getCometBFTServerVersion(),
consensusAuthority: consensusAuthority,
grpcQueryDecoders: grpcQueryDecoders,
grpcMethodsMap: gRPCMethodsMap,
app: app,
cfg: cfg,
store: store,
Expand Down Expand Up @@ -173,9 +174,10 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
// check if it's a gRPC method
grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path]
makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path]
if isGRPC {
protoRequest, err := grpcQueryDecoder(req.Data)
protoRequest := makeGRPCRequest()
err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l
appI.GetAppManager(),
s.serverOptions.Mempool,
indexEvents,
appI.GetGRPCQueryDecoders(),
appI.GetGPRCMethodsToMessageMap(),
appI.GetStore().(types.Store),
s.config,
s.initTxCodec,
Expand Down
2 changes: 1 addition & 1 deletion server/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ type AppI[T transaction.Tx] interface {
InterfaceRegistry() coreapp.InterfaceRegistry
GetAppManager() *appmanager.AppManager[T]
GetConsensusAuthority() string
GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error)
GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message
GetStore() any
}