Skip to content

Commit

Permalink
removed sequence number from buffered reader
Browse files Browse the repository at this point in the history
fix for BytePool 0 byte allocations
  • Loading branch information
arnecls committed May 3, 2016
1 parent 2dbe5d0 commit 146dc3c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
4 changes: 4 additions & 0 deletions tcontainer/bytepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func NewBytePoolWithSize(tinyChunkCount int, smallChunkCount int, mediumChunkCou
// If possible this slice is coming from a pool. Slices are automatically
// returned to the pool. No additional action is necessary.
func (b *BytePool) Get(size int) []byte {
if size == 0 {
return []byte{}
}

slab, normalized := b.getSlab(size)
if slab == nil {
return make([]byte, size) // ### return, oversized ###
Expand Down
25 changes: 9 additions & 16 deletions tio/bufferedreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (b bufferError) Error() string {

// BufferReadCallback defines the function signature for callbacks passed to
// ReadAll.
type BufferReadCallback func(msg []byte, sequence uint64)
type BufferReadCallback func(msg []byte)

// BufferDataInvalid is returned when a parsing encounters an error
var BufferDataInvalid = bufferError("Invalid data")
Expand All @@ -88,13 +88,10 @@ var BufferDataInvalid = bufferError("Invalid data")
// A data "piece" is considered complete if a delimiter or a certain runlength
// has been reached. The latter has to be enabled by flag and will disable the
// default behavior, which is looking for a delimiter string.
// In addition to that every data "piece" will receive an incrementing sequence
// number.
type BufferedReader struct {
data []byte
delimiter []byte
parse func() ([]byte, int)
sequence uint64
paramMLE int
growSize int
end int
Expand All @@ -118,7 +115,6 @@ func NewBufferedReader(bufferSize int, flags BufferedReaderFlags, offsetOrLength
delimiter: []byte(delimiter),
paramMLE: offsetOrLength,
encoding: binary.LittleEndian,
sequence: 0,
end: 0,
flags: flags,
growSize: bufferSize,
Expand Down Expand Up @@ -153,7 +149,6 @@ func NewBufferedReader(bufferSize int, flags BufferedReaderFlags, offsetOrLength

// Reset clears the buffer by resetting its internal state
func (buffer *BufferedReader) Reset(sequence uint64) {
buffer.sequence = sequence
buffer.end = 0
buffer.incomplete = true
}
Expand Down Expand Up @@ -267,9 +262,9 @@ func (buffer *BufferedReader) parseMLE64() ([]byte, int) {
// If callback is nil, data will be read and discarded.
func (buffer *BufferedReader) ReadAll(reader io.Reader, callback BufferReadCallback) error {
for {
data, seq, more, err := buffer.ReadOne(reader)
data, more, err := buffer.ReadOne(reader)
if data != nil && callback != nil {
callback(data, seq)
callback(data)
}

if err != nil {
Expand All @@ -289,16 +284,16 @@ func (buffer *BufferedReader) ReadAll(reader io.Reader, callback BufferReadCallb
// could be parsed.
// Errors are returned if reading from the stream failed or the parser
// encountered an error.
func (buffer *BufferedReader) ReadOne(reader io.Reader) (data []byte, seq uint64, more bool, err error) {
func (buffer *BufferedReader) ReadOne(reader io.Reader) (data []byte, more bool, err error) {
if buffer.incomplete {
bytesRead, err := reader.Read(buffer.data[buffer.end:])

if err != nil && err != io.EOF {
return nil, 0, buffer.end > 0, err // ### return, error reading ###
return nil, buffer.end > 0, err // ### return, error reading ###
}

if bytesRead == 0 {
return nil, 0, buffer.end > 0, err // ### return, no data ###
return nil, buffer.end > 0, err // ### return, no data ###
}

buffer.end += bytesRead
Expand All @@ -310,7 +305,7 @@ func (buffer *BufferedReader) ReadOne(reader io.Reader) (data []byte, seq uint64
if nextMsgIdx == -1 {
buffer.end = 0
buffer.incomplete = true
return nil, 0, true, BufferDataInvalid // ### return, invalid data ###
return nil, true, BufferDataInvalid // ### return, invalid data ###
}

if msgData == nil {
Expand All @@ -321,7 +316,7 @@ func (buffer *BufferedReader) ReadOne(reader io.Reader) (data []byte, seq uint64
copy(buffer.data, temp)
}
buffer.incomplete = true
return nil, 0, true, err // ### return, incomplete ###
return nil, true, err // ### return, incomplete ###
}

msgDataCopy := make([]byte, len(msgData))
Expand All @@ -335,7 +330,5 @@ func (buffer *BufferedReader) ReadOne(reader io.Reader) (data []byte, seq uint64
buffer.incomplete = true
}

seqNum := buffer.sequence
buffer.sequence++
return msgDataCopy, seqNum, buffer.end > 0, err
return msgDataCopy, buffer.end > 0, err
}

0 comments on commit 146dc3c

Please sign in to comment.