Skip to content

Commit

Permalink
KAFKA-14766: Improve performance of VarInt encoding and decoding (#13312
Browse files Browse the repository at this point in the history
)

Motivation

Reading/writing the protocol buffer varInt32 and varInt64 (also called varLong in our code base) is in the hot path of data plane code in Apache Kafka. We read multiple varInt in a record and in long. Hence, even a minor change in performance could extrapolate to larger performance benefit.

In this PR, we only update varInt32 encoding/decoding.
Changes

This change uses loop unrolling and reduces the amount of repetition of calculations. Based on the empirical results from the benchmark, the code has been modified to pick up the best implementation.
Results

Performance has been evaluated using JMH benchmarks on JDK 17.0.6. Various implementations have been added in the benchmark and benchmarking has been done for different sizes of varints and varlongs. The benchmark for various implementations have been added at ByteUtilsBenchmark.java

Reviewers: Ismael Juma <[email protected]>, Luke Chen <[email protected]>, Alexandre Dupriez <[email protected]>
  • Loading branch information
divijvaidya authored May 5, 2023
1 parent e34f884 commit 6bcc497
Show file tree
Hide file tree
Showing 4 changed files with 772 additions and 65 deletions.
156 changes: 123 additions & 33 deletions clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* This classes exposes low-level methods for reading/writing from byte streams or buffers.
*
* The implementation of these methods has been tuned for JVM and the empirical calculations could be found
* using ByteUtilsBenchmark.java
*/
public final class ByteUtils {

Expand Down Expand Up @@ -144,47 +148,84 @@ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
* Read an integer stored in variable-length format using unsigned decoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
*
* The implementation is based on Netty's decoding of varint.
* @see <a href="https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73">Netty's varint decoding</a>
*
* @param buffer The buffer to read from
* @return The integer read
*
* @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
*/
public static int readUnsignedVarint(ByteBuffer buffer) {
int value = 0;
int i = 0;
int b;
while (((b = buffer.get()) & 0x80) != 0) {
value |= (b & 0x7f) << i;
i += 7;
if (i > 28)
throw illegalVarintException(value);
byte tmp = buffer.get();
if (tmp >= 0) {
return tmp;
} else {
int result = tmp & 127;
if ((tmp = buffer.get()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 127) << 7;
if ((tmp = buffer.get()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 127) << 14;
if ((tmp = buffer.get()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 127) << 21;
result |= (tmp = buffer.get()) << 28;
if (tmp < 0) {
throw illegalVarintException(result);
}
}
}
}
return result;
}
value |= b << i;
return value;
}

/**
* Read an integer stored in variable-length format using unsigned decoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
*
* The implementation is based on Netty's decoding of varint.
* @see <a href="https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73">Netty's varint decoding</a>
*
* @param in The input to read from
* @return The integer read
*
* @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
* @throws IOException if {@link DataInput} throws {@link IOException}
* @throws EOFException if {@link DataInput} throws {@link EOFException}
*/
public static int readUnsignedVarint(DataInput in) throws IOException {
int value = 0;
int i = 0;
int b;
while (((b = in.readByte()) & 0x80) != 0) {
value |= (b & 0x7f) << i;
i += 7;
if (i > 28)
throw illegalVarintException(value);
static int readUnsignedVarint(DataInput in) throws IOException {
byte tmp = in.readByte();
if (tmp >= 0) {
return tmp;
} else {
int result = tmp & 127;
if ((tmp = in.readByte()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 127) << 7;
if ((tmp = in.readByte()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 127) << 14;
if ((tmp = in.readByte()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 127) << 21;
result |= (tmp = in.readByte()) << 28;
if (tmp < 0) {
throw illegalVarintException(result);
}
}
}
}
return result;
}
value |= b << i;
return value;
}

/**
Expand Down Expand Up @@ -250,6 +291,12 @@ public static long readVarlong(DataInput in) throws IOException {
* @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
*/
public static long readVarlong(ByteBuffer buffer) {
long raw = readUnsignedVarlong(buffer);
return (raw >>> 1) ^ -(raw & 1);
}

// visible for testing
static long readUnsignedVarlong(ByteBuffer buffer) {
long value = 0L;
int i = 0;
long b;
Expand All @@ -260,7 +307,7 @@ public static long readVarlong(ByteBuffer buffer) {
throw illegalVarlongException(value);
}
value |= b << i;
return (value >>> 1) ^ -(value & 1);
return value;
}

/**
Expand Down Expand Up @@ -288,33 +335,68 @@ public static double readDouble(ByteBuffer buffer) {
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
* into the buffer.
*
* Implementation copied from https://github.com/astei/varint-writing-showdown/tree/dev (MIT License)
* @see <a href="https://github.com/astei/varint-writing-showdown/blob/6b1a4baec4b1f0ce65fa40cf0b282ec775fdf43e/src/jmh/java/me/steinborn/varintshowdown/res/SmartNoDataDependencyUnrolledVarIntWriter.java#L8"> Sample implementation </a>
*
* @param value The value to write
* @param buffer The output to write to
*/
public static void writeUnsignedVarint(int value, ByteBuffer buffer) {
while ((value & 0xffffff80) != 0L) {
byte b = (byte) ((value & 0x7f) | 0x80);
buffer.put(b);
value >>>= 7;
if ((value & (0xFFFFFFFF << 7)) == 0) {
buffer.put((byte) value);
} else {
buffer.put((byte) (value & 0x7F | 0x80));
if ((value & (0xFFFFFFFF << 14)) == 0) {
buffer.put((byte) ((value >>> 7) & 0xFF));
} else {
buffer.put((byte) ((value >>> 7) & 0x7F | 0x80));
if ((value & (0xFFFFFFFF << 21)) == 0) {
buffer.put((byte) ((value >>> 14) & 0xFF));
} else {
buffer.put((byte) ((value >>> 14) & 0x7F | 0x80));
if ((value & (0xFFFFFFFF << 28)) == 0) {
buffer.put((byte) ((value >>> 21) & 0xFF));
} else {
buffer.put((byte) ((value >>> 21) & 0x7F | 0x80));
buffer.put((byte) ((value >>> 28) & 0xFF));
}
}
}
}
buffer.put((byte) value);
}

/**
* Write the given integer following the variable-length unsigned encoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
* into the buffer.
*
* For implementation notes, see {@link #writeUnsignedVarint(int, ByteBuffer)}
*
* @param value The value to write
* @param out The output to write to
*/
public static void writeUnsignedVarint(int value, DataOutput out) throws IOException {
while ((value & 0xffffff80) != 0L) {
byte b = (byte) ((value & 0x7f) | 0x80);
out.writeByte(b);
value >>>= 7;
if ((value & (0xFFFFFFFF << 7)) == 0) {
out.writeByte(value);
} else {
out.writeByte(value & 0x7F | 0x80);
if ((value & (0xFFFFFFFF << 14)) == 0) {
out.writeByte(value >>> 7);
} else {
out.writeByte((value >>> 7) & 0x7F | 0x80);
if ((value & (0xFFFFFFFF << 21)) == 0) {
out.writeByte(value >>> 14);
} else {
out.writeByte((byte) ((value >>> 14) & 0x7F | 0x80));
if ((value & (0xFFFFFFFF << 28)) == 0) {
out.writeByte(value >>> 21);
} else {
out.writeByte((value >>> 21) & 0x7F | 0x80);
out.writeByte(value >>> 28);
}
}
}
}
out.writeByte((byte) value);
}

/**
Expand Down Expand Up @@ -368,6 +450,11 @@ public static void writeVarlong(long value, DataOutput out) throws IOException {
*/
public static void writeVarlong(long value, ByteBuffer buffer) {
long v = (value << 1) ^ (value >> 63);
writeUnsignedVarlong(v, buffer);
}

// visible for testing and benchmarking
public static void writeUnsignedVarlong(long v, ByteBuffer buffer) {
while ((v & 0xffffffffffffff80L) != 0L) {
byte b = (byte) ((v & 0x7f) | 0x80);
buffer.put(b);
Expand Down Expand Up @@ -437,8 +524,11 @@ public static int sizeOfVarint(int value) {
* @see #sizeOfUnsignedVarint(int)
*/
public static int sizeOfVarlong(long value) {
long v = (value << 1) ^ (value >> 63);
return sizeOfUnsignedVarlong((value << 1) ^ (value >> 63));
}

// visible for benchmarking
public static int sizeOfUnsignedVarlong(long v) {
// For implementation notes @see #sizeOfUnsignedVarint(int)
// Similar logic is applied to allow for 64bit input -> 1-9byte output.
// return (70 - leadingZeros) / 7 + leadingZeros / 64;
Expand Down
Loading

0 comments on commit 6bcc497

Please sign in to comment.