Skip to content

Commit

Permalink
codec/proto: reuse of marshal byte buffers (grpc#3167)
Browse files Browse the repository at this point in the history
Performance benchmarks can be found below. Obviously, a 8 KiB
request/response is tailored to showcase this improvement as this is
where codec buffer reuse shines, but I've run other benchmarks too (like
1-byte requests and responses) and there's no discernable impact on
performance.

We do not allow reuse of buffers when stat handlers or binlogs are
turned on. This is because those two may need access to the data and
payload even after the data has been written to the wire. In such cases,
we never return the data back to the pool.

A buffer reuse threshold of 1 KiB was determined after several
experiments. There's diminished returns when buffer reuse is enabled for
smaller messages (actually, a negative impact).

unary-networkMode_none-bufConn_false-keepalive_false-benchTime_40s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_6-reqSize_8192B-respSize_8192B-compressor_off-channelz_false-preloader_false
               Title       Before        After Percentage
            TotalOps       839638       906223     7.93%
             SendOps            0            0      NaN%
             RecvOps            0            0      NaN%
            Bytes/op    103788.29     80592.47   -22.35%
           Allocs/op       183.33       189.30     3.27%
             ReqT/op 1375662899.20 1484755763.20     7.93%
            RespT/op 1375662899.20 1484755763.20     7.93%
            50th-Lat    238.746µs    225.019µs    -5.75%
            90th-Lat    514.253µs    456.439µs   -11.24%
            99th-Lat    711.083µs    702.466µs    -1.21%
             Avg-Lat     285.45µs    264.456µs    -7.35%
  • Loading branch information
Adhityaa Chandrasekar authored and dfawley committed Dec 20, 2019
1 parent ffcdcbc commit 6426751
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 89 deletions.
9 changes: 7 additions & 2 deletions Documentation/encoding.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ into bytes and vice-versa for the purposes of network transmission.
## Codecs (Serialization and Deserialization)

A `Codec` contains code to serialize a message into a byte slice (`Marshal`) and
deserialize a byte slice back into a message (`Unmarshal`). `Codec`s are
registered by name into a global registry maintained in the `encoding` package.
deserialize a byte slice back into a message (`Unmarshal`). Optionally, a
`ReturnBuffer` method to potentially reuse the byte slice returned by the
`Marshal` method may also be implemented; note that this is an experimental
feature with an API that is still in flux.

`Codec`s are registered by name into a global registry maintained in the
`encoding` package.

### Implementing a `Codec`

Expand Down
14 changes: 14 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ type baseCodec interface {
Unmarshal(data []byte, v interface{}) error
}

// A bufferReturner requires a ReturnBuffer method to be implemented. Once a
// Marshal caller is done with the returned byte buffer, they can choose to
// return it back to the encoding library for re-use using this method.
type bufferReturner interface {
// If implemented in a codec, this function may be called with the byte
// buffer returned by Marshal after gRPC is done with the buffer.
//
// gRPC will not call ReturnBuffer after it's done with the buffer if any of
// the following is true:
// 1. Stats handlers are used.
// 2. Binlogs are enabled.
ReturnBuffer(buf []byte)
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)

Expand Down
5 changes: 5 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func GetCompressor(name string) Compressor {
// Codec defines the interface gRPC uses to encode and decode messages. Note
// that implementations of this interface must be thread safe; a Codec's
// methods can be called from concurrent goroutines.
//
// Optionally, if a ReturnBuffer(buf []byte) is implemented, it may be called
// to return the byte slice it received from the Marshal function after gRPC is
// done with it. The codec may reuse this byte slice in a future Marshal
// operation to reduce the application's memory footprint.
type Codec interface {
// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
Expand Down
68 changes: 35 additions & 33 deletions encoding/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package proto

import (
"math"
"sync"

"github.com/golang/protobuf/proto"
Expand All @@ -38,29 +37,16 @@ func init() {
// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}

type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}

func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}

func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)
newSlice := returnBufferPool.Get().([]byte)

cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
pb.SetBuf(newSlice)
pb.Reset()
if err := pb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
out := pb.Bytes()
return out, nil
}

Expand All @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) {
return pm.Marshal()
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := marshal(v, cb)
pb := protoBufferPool.Get().(*proto.Buffer)
out, err := marshal(v, pb)

// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return out, err
}

Expand All @@ -88,23 +74,39 @@ func (codec) Unmarshal(data []byte, v interface{}) error {
return pu.Unmarshal(data)
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb := protoBufferPool.Get().(*proto.Buffer)
pb.SetBuf(data)
err := pb.Unmarshal(protoMsg)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return err
}

func (codec) ReturnBuffer(data []byte) {
// Make sure we set the length of the buffer to zero so that future appends
// will start from the zeroeth byte, not append to the previous, stale data.
//
// Apparently, sync.Pool with non-pointer objects (slices, in this case)
// causes small allocations because of how interface{} works under the hood.
// This isn't a problem for us, however, because we're more concerned with
// _how_ much that allocation is. Ideally, we'd be using bytes.Buffer as the
// Marshal return value to remove even that allocation, but we can't change
// the Marshal interface at this point.
returnBufferPool.Put(data[:0])
}

func (codec) Name() string {
return Name
}

var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
return &proto.Buffer{}
},
}

var returnBufferPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 16)
},
}
50 changes: 50 additions & 0 deletions encoding/proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,53 @@ func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) {
}
}
}

func TestBufferReuse(t *testing.T) {
c := codec{}

marshal := func(toMarshal []byte) []byte {
protoIn := &codec_perf.Buffer{Body: toMarshal}
b, err := c.Marshal(protoIn)
if err != nil {
t.Errorf("codec.Marshal(%v) failed: %v", protoIn, err)
}
// We cannot expect the actual pointer to be the same because sync.Pool
// during GC pauses.
bc := append([]byte(nil), b...)
c.ReturnBuffer(b)
return bc
}

unmarshal := func(b []byte) []byte {
protoOut := &codec_perf.Buffer{}
if err := c.Unmarshal(b, protoOut); err != nil {
t.Errorf("codec.Unarshal(%v) failed: %v", protoOut, err)
}
return protoOut.GetBody()
}

check := func(in []byte, out []byte) {
if len(in) != len(out) {
t.Errorf("unequal lengths: len(in=%v)=%d, len(out=%v)=%d", in, len(in), out, len(out))
}

for i := 0; i < len(in); i++ {
if in[i] != out[i] {
t.Errorf("unequal values: in[%d] = %v, out[%d] = %v", i, in[i], i, out[i])
}
}
}

// To test that the returned buffer does not have unexpected data at the end,
// we use a second input data that is smaller than the first.
in1 := []byte{1, 2, 3}
b1 := marshal(in1)
in2 := []byte{4, 5}
b2 := marshal(in2)

out1 := unmarshal(b1)
out2 := unmarshal(b2)

check(in1, out1)
check(in2, out2)
}
18 changes: 16 additions & 2 deletions internal/leakcheck/leakcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"runtime"
"sort"
"strings"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -74,11 +75,24 @@ func ignore(g string) bool {
return false
}

var lastStacktraceSize uint32 = 4 << 10

// interestingGoroutines returns all goroutines we care about for the purpose of
// leak checking. It excludes testing or runtime ones.
func interestingGoroutines() (gs []string) {
buf := make([]byte, 2<<20)
buf = buf[:runtime.Stack(buf, true)]
n := atomic.LoadUint32(&lastStacktraceSize)
buf := make([]byte, n)
for {
nb := uint32(runtime.Stack(buf, true))
if nb < uint32(len(buf)) {
buf = buf[:nb]
break
}
n <<= 1
buf = make([]byte, n)
}
atomic.StoreUint32(&lastStacktraceSize, n)

for _, g := range strings.Split(string(buf), "\n\n") {
if !ignore(g) {
gs = append(gs, g)
Expand Down
33 changes: 24 additions & 9 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
}

type itemNode struct {
it interface{}
next *itemNode
it interface{}
onDequeue func()
next *itemNode
}

type itemList struct {
head *itemNode
tail *itemNode
}

func (il *itemList) enqueue(i interface{}) {
n := &itemNode{it: i}
func (il *itemList) enqueue(i interface{}, onDequeue func()) {
n := &itemNode{it: i, onDequeue: onDequeue}
if il.tail == nil {
il.head, il.tail = n, n
return
Expand All @@ -63,11 +64,14 @@ func (il *itemList) dequeue() interface{} {
if il.head == nil {
return nil
}
i := il.head.it
i, onDequeue := il.head.it, il.head.onDequeue
il.head = il.head.next
if il.head == nil {
il.tail = nil
}
if onDequeue != nil {
onDequeue()
}
return i
}

Expand Down Expand Up @@ -136,6 +140,7 @@ type dataFrame struct {
// onEachWrite is called every time
// a part of d is written out.
onEachWrite func()
rb *ReturnBuffer
}

func (*dataFrame) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -329,7 +334,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b
wakeUp = true
c.consumerWaiting = false
}
c.list.enqueue(it)
c.list.enqueue(it, nil)
if it.isTransportResponseFrame() {
c.transportResponseFrames++
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
Expand Down Expand Up @@ -616,7 +621,7 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {

if str.state != empty { // either active or waiting on stream quota.
// add it str's list of items.
str.itl.enqueue(h)
str.itl.enqueue(h, nil)
return nil
}
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
Expand All @@ -631,7 +636,7 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
itl: &itemList{},
wq: h.wq,
}
str.itl.enqueue(h)
str.itl.enqueue(h, nil)
return l.originateStream(str)
}

Expand Down Expand Up @@ -702,7 +707,11 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
}
// If we got data for a stream it means that
// stream was originated and the headers were sent out.
str.itl.enqueue(df)
var onDequeue func()
if df.rb != nil {
onDequeue = df.rb.Done
}
str.itl.enqueue(df, onDequeue)
if str.state == empty {
str.state = active
l.activeStreams.enqueue(str)
Expand All @@ -726,6 +735,12 @@ func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequ
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
c.onWrite()
if str, ok := l.estdStreams[c.streamID]; ok {
// Dequeue all items from the stream's item list. This would call any pending onDequeue functions.
if str.state == active {
for !str.itl.isEmpty() {
str.itl.dequeue()
}
}
// On the server side it could be a trailers-only response or
// a RST_STREAM before stream initialization thus the stream might
// not be established yet.
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
rb: opts.ReturnBuffer,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
Expand All @@ -862,6 +863,9 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
return err
}
}
if df.rb != nil {
df.rb.Add(1)
}
return t.controlBuf.put(df)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
rb: opts.ReturnBuffer,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
Expand All @@ -932,6 +933,9 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
}
return ContextErr(s.ctx.Err())
}
if df.rb != nil {
df.rb.Add(1)
}
return t.controlBuf.put(df)
}

Expand Down
Loading

0 comments on commit 6426751

Please sign in to comment.