Skip to content

Commit

Permalink
KAFKA-6432: Make index lookup more cache friendly (apache#5346)
Browse files Browse the repository at this point in the history
 KAFKA-6432: Make index lookup more cache friendly

For each topic-partition, Kafka broker maintains two indices: one for message offset, one for message timestamp. By default, a new index entry is appended to each index for every 4KB messages. The lookup of the indices is a simple binary search. The indices are mmaped files, and cached by Linux page cache.

Both consumer fetch and follower fetch have to do an offset lookup, before accessing the actual message data. The simple binary search algorithm used for looking up the index is not cache friendly, and may cause page faults even on high QPS topic-partitions.

In a normal Kafka broker, all the follower fetch requests, and most consumer fetch requests should only look up the last few entries of the index. We can make the index lookup more cache friendly, by searching in the last one or two pages of the index first. 

Reviewers: Colin Patrick McCabe <[email protected]>, Guozhang Wang <[email protected]>, Ted Yu <[email protected]>,  Ismael Juma <[email protected]>, Sriharsha Chintalapani <[email protected]>
  • Loading branch information
ying-zheng authored and harshach committed Jul 27, 2018
1 parent 061885e commit a61594d
Showing 1 changed file with 84 additions and 17 deletions.
101 changes: 84 additions & 17 deletions core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,67 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
// Length of the index file
@volatile
private var _length: Long = _

protected def entrySize: Int

/*
Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This
avoids blocked disk I/O in most cases.
To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page
cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync
followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very
well with Kafka's index access pattern.
However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary
page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not
cached in the page cache).
For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search
algorithm will read index entries in page #0, 6, 9, 11, and 12.
page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
steps: |1| | | | | |3| | |4| |5 |2/6|
In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the
index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append)
operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages
are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be
in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12,
and 13:
page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
steps: |1| | | | | | |3| | | 4|5 | 6|2/7|
Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than
the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7
and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the
at-least-once produce latency to jump to about 1 second from a few ms.
Here, we use a more cache-friendly lookup algorithm:
if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
binarySearch(end - N, end)
else
binarySearch(begin, end - N)
If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync
lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this
relatively small section, the pages containing this section are more likely to be in the page cache.
We set N (_warmEntries) to 8192, because
1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
lookup. So that, the entire warm section is really "warm".
When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
SPARC, Power, ARM etc.).
2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.
We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm"
section pages are really warm (touched in every lookup) on a typical 4KB-page host.
In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can
1) support larger warm section
2) make sure the warm section of low QPS topic-partitions are really warm.
*/
protected def _warmEntries: Int = 8192 / entrySize

protected val lock = new ReentrantLock

@volatile
Expand Down Expand Up @@ -311,26 +369,35 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
if(_entries == 0)
return (-1, -1)

def binarySearch(begin: Int, end: Int) : (Int, Int) = {
// binary search for the entry
var lo = begin
var hi = end
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}

val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// check if the target offset is in the warm section of the index
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
return binarySearch(firstHotEntry, _entries - 1)
}

// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)

// binary search for the entry
var lo = 0
var hi = _entries - 1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}

(lo, if (lo == _entries - 1) -1 else lo + 1)
return binarySearch(0, firstHotEntry)
}

private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
Expand Down

0 comments on commit a61594d

Please sign in to comment.