Skip to content

Commit

Permalink
Make IO Buffer size configurable. (grpc#1544)
Browse files Browse the repository at this point in the history
* Make IO Buffer size configurable.

* Fixing typo
  • Loading branch information
MakMukhi authored Sep 28, 2017
1 parent 6014154 commit 8214c28
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 9 deletions.
4 changes: 4 additions & 0 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
if nw != nil {
lis = nw.Listener(lis)
}
opts = append(opts, grpc.WriteBufferSize(128*1024))
opts = append(opts, grpc.ReadBufferSize(128*1024))
s := grpc.NewServer(opts...)
switch info.Type {
case "protobuf":
Expand Down Expand Up @@ -236,6 +238,8 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli

// NewClientConn creates a gRPC client connection to addr.
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
opts = append(opts, grpc.WithWriteBufferSize(128*1024))
opts = append(opts, grpc.WithReadBufferSize(128*1024))
conn, err := grpc.Dial(addr, opts...)
if err != nil {
grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
Expand Down
16 changes: 16 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ const (
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)

// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
func WithWriteBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.WriteBufferSize = s
}
}

// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for each read syscall.
func WithReadBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.ReadBufferSize = s
}
}

// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func WithInitialWindowSize(s int32) DialOption {
Expand Down
20 changes: 20 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type options struct {
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
}

var defaultServerOptions = options{
Expand All @@ -126,6 +128,22 @@ var defaultServerOptions = options{
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)

// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
func WriteBufferSize(s int) ServerOption {
return func(o *options) {
o.writeBufferSize = s
}
}

// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for one read syscall.
func ReadBufferSize(s int) ServerOption {
return func(o *options) {
o.readBufferSize = s
}
}

// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialWindowSize(s int32) ServerOption {
Expand Down Expand Up @@ -524,6 +542,8 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
InitialConnWindowSize: s.opts.initialConnWindowSize,
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
dynamicWindow = false
}
var buf bytes.Buffer
writeBufSize := defaultWriteBufSize
if opts.WriteBufferSize > 0 {
writeBufSize = opts.WriteBufferSize
}
readBufSize := defaultReadBufSize
if opts.ReadBufferSize > 0 {
readBufSize = opts.ReadBufferSize
}
t := &http2Client{
ctx: ctx,
target: addr.Addr,
Expand All @@ -209,9 +217,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
framer: newFramer(conn, writeBufSize, readBufSize),
controlBuf: newControlBuffer(),
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
Expand Down
10 changes: 9 additions & 1 deletion transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,15 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
writeBufSize := defaultWriteBufSize
if config.WriteBufferSize > 0 {
writeBufSize = config.WriteBufferSize
}
readBufSize := defaultReadBufSize
if config.ReadBufferSize > 0 {
readBufSize = config.ReadBufferSize
}
framer := newFramer(conn, writeBufSize, readBufSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
Expand Down
9 changes: 5 additions & 4 deletions transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const (
// http://http2.github.io/http2-spec/#SettingValues
http2InitHeaderTableSize = 4096
// http2IOBufSize specifies the buffer size for sending frames.
http2IOBufSize = 32 * 1024
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)

var (
Expand Down Expand Up @@ -474,10 +475,10 @@ type framer struct {
fr *http2.Framer
}

func newFramer(conn net.Conn) *framer {
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
f := &framer{
reader: bufio.NewReaderSize(conn, http2IOBufSize),
writer: bufio.NewWriterSize(conn, http2IOBufSize),
reader: bufio.NewReaderSize(conn, readBufferSize),
writer: bufio.NewWriterSize(conn, writeBufferSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
// Opt-in to Frame reuse API on framer to reduce garbage.
Expand Down
8 changes: 7 additions & 1 deletion transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ type ServerConfig struct {
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
}

// NewServerTransport creates a ServerTransport with conn or non-nil error
Expand Down Expand Up @@ -503,6 +505,10 @@ type ConnectOptions struct {
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
InitialConnWindowSize int32
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
}

// TargetInfo contains the information of the target such as network address and metadata.
Expand All @@ -526,7 +532,7 @@ type Options struct {

// Delay is a hint to the transport implementation for whether
// the data could be buffered for a batching write. The
// Transport implementation may ignore the hint.
// transport implementation may ignore the hint.
Delay bool
}

Expand Down
4 changes: 2 additions & 2 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,8 +1989,8 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
t.Errorf("Error at server-side while reading preface from cleint. Err: %v", err)
return
}
reader := bufio.NewReaderSize(s.conn, http2IOBufSize)
writer := bufio.NewWriterSize(s.conn, http2IOBufSize)
reader := bufio.NewReaderSize(s.conn, defaultWriteBufSize)
writer := bufio.NewWriterSize(s.conn, defaultReadBufSize)
framer := http2.NewFramer(writer, reader)
if err = framer.WriteSettingsAck(); err != nil {
t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
Expand Down

0 comments on commit 8214c28

Please sign in to comment.