diff --git a/call.go b/call.go index 0843865244de..4e2687400ad4 100644 --- a/call.go +++ b/call.go @@ -116,7 +116,7 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, } outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload) if err != nil { - return Errorf(codes.Internal, "grpc: %v", err) + return err } err = t.Write(stream, outBuf, opts) if err == nil && outPayload != nil { diff --git a/rpc_util.go b/rpc_util.go index 6a32afdfcc88..b55259cc1d93 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -289,7 +289,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl // TODO(zhaoq): optimize to reduce memory alloc and copying. b, err = c.Marshal(msg) if err != nil { - return nil, err + return nil, Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error()) } if outPayload != nil { outPayload.Payload = msg @@ -299,7 +299,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl } if cp != nil { if err := cp.Do(cbuf, b); err != nil { - return nil, err + return nil, Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) } b = cbuf.Bytes() } diff --git a/server.go b/server.go index 5a1d4ea1b4aa..91d30d392b96 100644 --- a/server.go +++ b/server.go @@ -640,14 +640,8 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload) if err != nil { - // This typically indicates a fatal issue (e.g., memory - // corruption or hardware faults) the application program - // cannot handle. - // - // TODO(zhaoq): There exist other options also such as only closing the - // faulty stream locally and remotely (Other streams can keep going). Find - // the optimal option. - grpclog.Fatalf("grpc: Server failed to encode response %v", err) + grpclog.Println("grpc: server failed to encode response: ", err) + return err } err = t.Write(stream, p, opts) if err == nil && outPayload != nil { diff --git a/stream.go b/stream.go index ec534a017b1d..9cc9fa615efb 100644 --- a/stream.go +++ b/stream.go @@ -359,7 +359,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } }() if err != nil { - return Errorf(codes.Internal, "grpc: %v", err) + return err } err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) if err == nil && outPayload != nil { @@ -588,7 +588,6 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { } }() if err != nil { - err = Errorf(codes.Internal, "grpc: %v", err) return err } if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 0eee77d01341..796b5457feda 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -450,6 +450,7 @@ type test struct { clientInitialWindowSize int32 clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials + customCodec grpc.Codec // srv and srvAddr are set once startServer is called. srv *grpc.Server @@ -545,6 +546,9 @@ func (te *test) startServer(ts testpb.TestServiceServer) { case "clientTimeoutCreds": sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{})) } + if te.customCodec != nil { + sopts = append(sopts, grpc.CustomCodec(te.customCodec)) + } s := grpc.NewServer(sopts...) te.srv = s if te.e.httpHandler { @@ -625,6 +629,9 @@ func (te *test) clientConn() *grpc.ClientConn { if te.perRPCCreds != nil { opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds)) } + if te.customCodec != nil { + opts = append(opts, grpc.WithCodec(te.customCodec)) + } var err error te.cc, err = grpc.Dial(te.srvAddr, opts...) if err != nil { @@ -4109,3 +4116,46 @@ func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) { t.Fatalf("Test failed. Reason: %v", err) } } + +type errCodec struct { + noError bool +} + +func (c *errCodec) Marshal(v interface{}) ([]byte, error) { + if c.noError { + return []byte{}, nil + } + return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12") +} + +func (c *errCodec) Unmarshal(data []byte, v interface{}) error { + return nil +} + +func (c *errCodec) String() string { + return "Fermat's near-miss." +} + +func TestEncodeDoesntPanic(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testEncodeDoesntPanic(t, e) + } +} + +func testEncodeDoesntPanic(t *testing.T, e env) { + te := newTest(t, e) + erc := &errCodec{} + te.customCodec = erc + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + te.customCodec = nil + tc := testpb.NewTestServiceClient(te.clientConn()) + // Failure case, should not panic. + tc.EmptyCall(context.Background(), &testpb.Empty{}) + erc.noError = true + // Passing case. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall(_, _) = _, %v, want _, ", err) + } +}