Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use proto.Buffer API for protobuf codec and cache proto.Buffer structs #1010

Merged
merged 27 commits into from
Apr 13, 2017
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e946d53
use a global sharded pool of proto.Buffer caches in protoCodec
apolcyn Nov 17, 2016
f8e2d0b
fix goimports
apolcyn Dec 5, 2016
d9756fc
make global buffer pool index counter atomic
apolcyn Dec 9, 2016
3ab447e
hack to remove alloc in encode_len_struct
apolcyn Dec 28, 2016
3a7b0e8
remove extra slice alloc in proto codec marshal
apolcyn Dec 28, 2016
0f18cff
replce magic number for proto size field length with constant
apolcyn Dec 28, 2016
0b1ccd4
replace custom cache with sync.Pool
apolcyn Dec 30, 2016
a1e068f
remove 1 line functions in codec.go and add protoCodec microbenchmarks
apolcyn Feb 7, 2017
a21119d
add concurrent usage test for protoCodec
apolcyn Feb 7, 2017
e16c62c
fix golint.gofmt,goimport checks
apolcyn Feb 7, 2017
586bba2
fix issues in codec.go and codec_test.go
apolcyn Feb 10, 2017
510b880
use go parallel benchmark helpers
apolcyn Feb 12, 2017
34bf3db
replace proto.Codec with a guess of size needed
apolcyn Feb 12, 2017
c8c475c
update Fatalf -> Errorf in tests
apolcyn Feb 13, 2017
8431a0d
wrap proto.Buffer along with cached last size into larger struct for …
apolcyn Feb 14, 2017
f2b8177
make wrapped proto buffer only a literal
apolcyn Feb 14, 2017
225b3ae
fix style and imports
apolcyn Feb 14, 2017
f841695
move b.Run into inner function
apolcyn Feb 14, 2017
776137a
reverse micro benchmark op order to unmarshal-marshal and fix benchma…
apolcyn Feb 14, 2017
93d4020
add test for large message
apolcyn Feb 15, 2017
d30dcba
remove use of defer in codec.marshal
apolcyn Feb 22, 2017
126640b
revert recent changes to codec bencmarks
apolcyn Feb 27, 2017
18485fe
move sub-benchmarks into >= go-1.7 only file
apolcyn Feb 27, 2017
93c3e12
add commentfor marshaler and tweak benchmark subtests for easier usage
apolcyn Feb 27, 2017
5ef6c9b
move build tag for go1.7 on benchmarks to inside file
apolcyn Feb 27, 2017
42d712a
move build tag to top of file
apolcyn Feb 27, 2017
1517ac9
comment Codec, embed proto.Buffer into cached struct and add an int32…
apolcyn Mar 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
comment Codec, embed proto.Buffer into cached struct and add an int32…
… cap
  • Loading branch information
apolcyn committed Mar 27, 2017
commit 1517ac9bff83c1fb38b42dc7e4ef1abd579241f3
34 changes: 22 additions & 12 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@
package grpc

import (
"math"
"sync"

"github.com/golang/protobuf/proto"
)

// Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe;
// a Codec's methods can be called from concurrent goroutines.
type Codec interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumb question: are codecs expected to be thread safe? will Codecs ever have user visible state? I know the one below doesn't but this is becoming part of public API, so it may be worth documenting how to use this more clearly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the Codec interface is here currently, this PR moves it from where it was (used to be in https://github.com/grpc/grpc-go/blob/master/rpc_util.go#L57).

Added a comment though to mention that implementations must be thread safe.

// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
Expand All @@ -55,21 +58,28 @@ type protoCodec struct {
}

type cachedProtoBuffer struct {
lastMarshaledSize int32
buffer proto.Buffer
lastMarshaledSize uint32
proto.Buffer
}

func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}

func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)

cb.buffer.SetBuf(newSlice)
cb.buffer.Reset()
if err := cb.buffer.Marshal(protoMsg); err != nil {
cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
return nil, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err := buffer.Marshal(protoMsg); err != nil {
return nil, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine, if there were an error while Marshaling, we'd still want to put the buffer back in the pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops thanks good catch I missed that. put all of the cleanup into a defer

out := cb.buffer.Bytes()
cb.lastMarshaledSize = int32(len(out))
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
return out, nil
}

Expand All @@ -78,16 +88,16 @@ func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
out, err := p.marshal(v, cb)

// put back buffer and lose the ref to the slice
cb.buffer.SetBuf(nil)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return out, err
}

func (p protoCodec) Unmarshal(data []byte, v interface{}) error {
cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.buffer.SetBuf(data)
err := cb.buffer.Unmarshal(v.(proto.Message))
cb.buffer.SetBuf(nil)
cb.SetBuf(data)
err := cb.Unmarshal(v.(proto.Message))
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return err
}
Expand All @@ -100,7 +110,7 @@ var (
protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
buffer: proto.Buffer{},
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
},
Expand Down