-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use proto.Buffer API for protobuf codec and cache proto.Buffer structs #1010
Conversation
b17cf19
to
c7a3c17
Compare
The first commit here used a non-atomic global counter to index into one of sharded pool. (used a token only to alloc and free from same pool). It still makes an overall QPS improvement over 10% though, there's a small new cost but earlier was breaking the race detection test... |
on top of all of the changes referenced in #1031, the latest two commits reduce object alloc counts by from about 15-20% and reduce space allocation ~300MB. QPS isn't improved beyoned noise much |
more on what the last two commits do:
after these changes it changes to:
(On the removal of the second |
@apolcyn Perhaps I don't understand this change: It appears that this change makes the marshaller / unmarshaller just reuese the proto.Buffer objects, rather than reuse the underlying slices. Is that correct? IIRC it's the slices that are actually expensive to allocate, not the protobuf wrappers around them. I'm curious how this improves performance. The only way I could imagine this helping at all is when encoding proto2, which allocates new ints on the heap, and caches them inside the proto.Buffer. Even then, it only helps with encoding, not decoding, and they disappear after a GC. That would hurt most of the gains from this PR. Am I missing something? |
for previous comment, discussed offline but main benefit of the memory-saving seems to be by caching the |
Heads ups on another change for this PR: cc @carl-mastrangelo actually revisiting the use of A small micro-benchmark comparison of the two pools (repeatedly doing concurrent marshals/unmarshals) show similar results, with maybe a slight (within 10%) improvement with the custom cache in here. But if anything overall effect seems better with This can probably be a lot simpler using just the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ending this review on these comments so far. Some of them apply to the rest of the code too.
codec.go
Outdated
protoSizeFieldLength = 4 | ||
) | ||
|
||
var protoMsg = v.(proto.Message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can get away with protoMsg := v.(proto.Message)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec.go
Outdated
|
||
var protoMsg = v.(proto.Message) | ||
// adding 4 to proto.Size avoids an extra allocation when appending the 4 byte length | ||
// field in 'proto.Buffer.enc_len_thing' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same nitpick about comment; Should look like a complete sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec.go
Outdated
|
||
func (p protoCodec) Marshal(v interface{}) ([]byte, error) { | ||
const ( | ||
protoSizeFieldLength = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a global const? That's save allocation to a const every time the function is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be getting compiled in. Keeping since it' only used locally? (e.g., go bench memory stats of show no difference)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can confirm by looking at the highly readable assembly produced
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually noticed that go proto objects in micro benchmarks aren't doing the realloc in enc_len_thing
as is happening in the large full grpc benchmarks. I took this out for right now. Is this only needed on certain protos, fixed in new versions of go proto compiler?
codec.go
Outdated
|
||
buffer := protoBufferPool.Get().(*proto.Buffer) | ||
|
||
newSlice := make([]byte, sizeNeeded) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since sizeNeeded isn't used anywhere else, it might make more sense to just remove that variable:
newSlice := make([]byte, proto.Size(protoMsg) + protoSizeFieldLength)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec.go
Outdated
err := buffer.Marshal(protoMsg) | ||
if err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := buffer.Marshal(protoMsg); err != nil {
return nil, err
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine, if there were an error while Marshaling, we'd still want to put the buffer back in the pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woops thanks good catch I missed that. put all of the cleanup into a defer
codec_test.go
Outdated
} | ||
|
||
func marshalAndUnmarshal(protoCodec Codec, expectedBody []byte, t *testing.T) { | ||
original := &codec_perf.Buffer{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
original := &codec_perf.Buffer {
Body: expectedBody
}
Also, can we reuse this variable instead of creating variable deserialized? That's just a nitpick again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec_test.go
Outdated
|
||
result := deserialized.GetBody() | ||
|
||
if bytes.Compare(result, expectedBody) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since result is not used anywhere else maybe it's better to use deserialized.GetBody{} inline, here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec_test.go
Outdated
|
||
// Try to catch possible race conditions around use of pools | ||
func TestConcurrentUsage(t *testing.T) { | ||
const numGoRoutines = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const (
numGoRoutines = ..
...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
var wg sync.WaitGroup | ||
codec := protoCodec{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inconsistent with how the previous function initializes protoCodec( via newProtoCodec). However, this looks cleaner. May be, remove the function newProtoCodec() altogether?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec_test.go
Outdated
|
||
// This tries to make sure that buffers weren't stomped on | ||
// between marshals on codecs taking from the same pool. | ||
func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function also could use styling comments mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reworded this
codec.go
Outdated
|
||
protoMsg := v.(proto.Message) | ||
buffer := protoBufferPool.Get().(*proto.Buffer) | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might consider rewriting this without the defers. They have become faster recently, but they aren't free.
Also benchmark it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least local ProtoCodec micro benchmarks showed no difference beyond usual noise with and without defers. If it sounds ok, thinking we should keep them in?
codec.go
Outdated
|
||
// Adding 4 to proto.Size avoids an extra allocation when appending the 4 byte length | ||
// field in 'proto.Buffer.enc_len_thing'. | ||
newSlice := make([]byte, proto.Size(protoMsg)+protoSizeFieldLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Size is really expensive to calculate. It may be faster to not call it and just guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced the proto.Size
calculation with a guess based on length of buffer.GetBytes
from the last call. Micro benchmarks had something around 20% improvement! E.g., under SetParallelism(1)
, usual times went from ~130-140ns to ~100-110ns.
codec_test.go
Outdated
} | ||
|
||
func BenchmarkProtoCodec10Goroutines(b *testing.B) { | ||
benchmarkProtoCodecConcurrentUsage(10, b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done using sub benchmarks. See https://golang.org/pkg/testing/#B.Run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, for that matter: b.SetParallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec_test.go
Outdated
codec := &protoCodec{} | ||
var wg sync.WaitGroup | ||
for i := 0; i < goroutines; i++ { | ||
wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add is a synchronous call, which may make the benchmarks noiser than they could be. Consider calling it just once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of this changed to use the RunParallel
framework
codec_test.go
Outdated
func fastMarshalAndUnmarshal(protoCodec Codec, protoStruct interface{}, b *testing.B) { | ||
marshalledBytes, err := protoCodec.Marshal(protoStruct) | ||
if err != nil { | ||
b.Fatalf("protoCodec.Marshal(_) returned an error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errorf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
codec_test.go
Outdated
i += 2 | ||
b.Run(fmt.Sprintf("BenchmarkProtoCodec_SetParallelism(%v)", p), func(b *testing.B) { | ||
codec := &protoCodec{} | ||
b.SetParallelism(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is racy no? p
is modified across the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit stuck, isn't p loop-local? or at least closed over each round?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scratch the closed part :) but I thought should be loop local. Also moved to inside inner function..
codec_test.go
Outdated
var err error | ||
|
||
if m1, err = codec1.Marshal(&proto1); err != nil { | ||
t.Fatalf("protoCodec.Marshal(%v) failed", proto1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errorf
If any conditional here doesn't depend on the previous one, it should be Errorf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
codec.go
Outdated
func (p protoCodec) Unmarshal(data []byte, v interface{}) error { | ||
buffer := protoBufferPool.Get().(*proto.Buffer) | ||
buffer.SetBuf(data) | ||
err := buffer.Unmarshal(v.(proto.Message)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if it is somehow possible, but Go will check the type every time. You might consider benchmarking without this interface punning to see if it helps at all. If it does, you could modify the code to somehow avoid the type pun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this one btw but unfortunately the differences in runs seemed usual with noise
codec.go
Outdated
return nil, err | ||
} | ||
out := buffer.Bytes() | ||
atomic.StoreUint32(&lastMarshaledSize, uint32(len(out))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a contention point right? If marshalling hundreds of thousands of RPCs per second this will contend heavily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest update wraps the cached proto.Buffer structs into a larger struct with a cached size too, so atomics are no longer used.
Tried the sync.Pool too, though benchmark speeds weren't decreased, the mutex contention profiler seemed to show a lot higher contended mutex counts and times with the separate pools rather than single pools. (something like 2mins vs. 1 min.)
Results I'm getting are a bit noisy or hard to make complete sense of, but the latest updates seem to definitely make an improvement in micro benchmarks, over the previous proto.Size calls.
eff7748
to
0293616
Compare
it seems i ended up needing to go back to not using |
ftr latest update removed use of defer. Re-running benchmarks shows a small but seemingly noticeable difference of something like 95ns -> 90ns. And seems simple to remove it.. |
…rk setup-in-test bug
16f4d51
to
1517ac9
Compare
FTR the previous travis run seemed to have hit a flake in But travis is now green - this PR was rebased on master, with go-1.5 builds no longer running. |
grpc#1010) * use a global sharded pool of proto.Buffer caches in protoCodec * fix goimports * make global buffer pool index counter atomic * hack to remove alloc in encode_len_struct * remove extra slice alloc in proto codec marshal * replce magic number for proto size field length with constant * replace custom cache with sync.Pool * remove 1 line functions in codec.go and add protoCodec microbenchmarks * add concurrent usage test for protoCodec * fix golint.gofmt,goimport checks * fix issues in codec.go and codec_test.go * use go parallel benchmark helpers * replace proto.Codec with a guess of size needed * update Fatalf -> Errorf in tests * wrap proto.Buffer along with cached last size into larger struct for pool use * make wrapped proto buffer only a literal * fix style and imports * move b.Run into inner function * reverse micro benchmark op order to unmarshal-marshal and fix benchmark setup-in-test bug * add test for large message * remove use of defer in codec.marshal * revert recent changes to codec bencmarks * move sub-benchmarks into >= go-1.7 only file * add commentfor marshaler and tweak benchmark subtests for easier usage * move build tag for go1.7 on benchmarks to inside file * move build tag to top of file * comment Codec, embed proto.Buffer into cached struct and add an int32 cap
replaces #955, which was overly invasive.
#955 used pools "per-transport" pools, only as a way to reduce contention on the
proto.Buffer
caches.If we use a simple global cache of
proto.Buffer
instead, the change doesn't have to affect anything outside of theprotoCodec
, but experimentally contention goes up and the benefit on streaming QPS is removed. But sharding this pool seems to fix this contention - streaming QPS get the slightly greater than 15% improvement that it got earlier with the "per-transport" pools.