-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12675: improve the sticky general assignor scalability and performance #10552
Conversation
for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { | ||
TopicPartition topicPartition = new TopicPartition(topic, i); | ||
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition); | ||
partition2AllPotentialConsumers.get(topicPartition).add(consumerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor 1:
We used to have 2 map consumer2AllPotentialPartitions
and partition2AllPotentialConsumers
. But that would need a lot of memory here, ex: consumer2AllPotentialPartitions
will need 2000 map, and each map contains 1M partitions (suppose 1 million partition and 2000 consumers). But actually, we only need to store the topics of each potential partitions/consumers, and mapped with partitionsPerTopic
. so I changed to topic2AllPotentialConsumers
and consumer2AllPotentialTopics
. Save memory and save time.
* @param toBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions | ||
* @return the partitions don't assign to any current consumers | ||
*/ | ||
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor 2:
We used to have an ArrayList of unassignedPartitions
, with all sorted partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, the ArrayList element remove is pretty slow for huge size because it needs to find element first, and then, do arrayCopy for the removed array with size of (originalSize -1). This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement.
To refactor it, I used two pointer technique to loop through 2 sorted list: sortedPartitions
and sortedToBeRemovedPartitions
. And only add the difference set of the 2 lists. The looping and element adding is very fast in ArrayList. So, it improves a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great!
Collections.sort(toBeRemovedPartitions, new PartitionComparator(topic2AllPotentialConsumers)); | ||
unassignedPartitions = getUnassignedPartitions(sortedPartitions, toBeRemovedPartitions); | ||
} else { | ||
unassignedPartitions = sortedPartitions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use unassignedPartitions
and sortedPartitions
as the base list, so make them refer to the same list to save memory when brand-new assignment.
private void prepopulateCurrentAssignments(Map<String, Subscription> subscriptions, | ||
Map<String, List<TopicPartition>> currentAssignment, | ||
Map<TopicPartition, ConsumerGenerationPair> prevAssignment) { | ||
// we need to process subscriptions' user data with each consumer's reported generation in mind | ||
// higher generations overwrite lower generations in case of a conflict | ||
// note that a conflict could exists only if user data is for different generations | ||
|
||
// for each partition we create a sorted map of its consumers by generation | ||
Map<TopicPartition, TreeMap<Integer, String>> sortedPartitionConsumersByGeneration = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor 3:
We used to have a sortedPartitionConsumersByGeneration
map to store all partitions with all generation/consumer, to compute the currentAssignment
and prevAssignment
. It takes many memory and slow down the calculation. Improve it by computing the currentAssignment
and prevAssignment
while looping the subscriptions list (referred to the allSubscriptionsEqual
method :)) .
List<String> allTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet()); | ||
Collections.sort(allTopics, new TopicComparator(topic2AllPotentialConsumers)); | ||
|
||
// since allTopics are sorted, we can loop through allTopics to create the sortedPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor 4: To have sortPartitions
list, we used to sort all of the partitions. To improve it, I sort all topics first(only 500 topics to sort, compared to the original 1 million partitions to sort), and then add the partitions by looping all sorted topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
} | ||
for (int i = 0; i < consumerCount; i++) { | ||
if (i == consumerCount - 1) { | ||
subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics.subList(0, 1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscribe to only 1 topic for the last consumer
@ableegoldman , please help review this PR. Thank you. |
As @showuon pointed out in github.com/apache/kafka/pull/10552, tracking partitionPotentials (in Java, partition2AllPotentialConsumers) is a huge waste of memory when we only need to know potential topic consumers. If we knock that out, we knock out the most expensive allocation as well as a lot of hot looping. Also, if the members are consistently heap sorted by the least loaded member, then assigning parttions gets much faster. Lastly, we can knock out more allocations by getting rid of partNum. This does unfortunately slow things down in the complex graph case, but that only happened in one benchmark. Overall it may be a wash. name old time/op new time/op delta Large-8 9.32ms ± 1% 4.95ms ± 1% -46.89% (p=0.000 n=10+10) LargeWithExisting-8 15.7ms ± 1% 12.8ms ± 1% -18.66% (p=0.000 n=10+10) LargeImbalanced-8 25.7ms ±27% 144.7ms ±14% +462.35% (p=0.000 n=10+10) LargeWithExistingImbalanced-8 15.7ms ± 1% 12.7ms ± 1% -19.38% (p=0.000 n=10+10) Java/large-8 2.63s ± 1% 0.40s ± 2% -84.72% (p=0.000 n=10+10) Java/large_imbalance-8 13.4s ± 5% 0.5s ± 3% -96.62% (p=0.000 n=9+10) Java/medium-8 70.9ms ± 1% 17.9ms ± 1% -74.73% (p=0.000 n=10+9) Java/medium_imbalance-8 216ms ± 1% 22ms ± 1% -89.78% (p=0.000 n=10+9) Java/small-8 49.3ms ± 0% 14.4ms ± 1% -70.79% (p=0.000 n=10+10) Java/small_imbalance-8 149ms ± 0% 17ms ± 1% -88.46% (p=0.000 n=9+10) name old alloc/op new alloc/op delta Large-8 7.12MB ± 0% 4.43MB ± 0% -37.73% (p=0.000 n=10+10) LargeWithExisting-8 9.60MB ± 0% 6.94MB ± 0% -27.71% (p=0.000 n=9+9) LargeImbalanced-8 17.0MB ± 0% 4.7MB ± 1% -72.09% (p=0.000 n=9+10) LargeWithExistingImbalanced-8 9.60MB ± 0% 7.00MB ± 0% -27.13% (p=0.000 n=10+9) Java/large-8 531MB ± 0% 441MB ± 0% -17.09% (p=0.000 n=10+9) Java/large_imbalance-8 8.54GB ± 0% 0.50GB ± 0% -94.10% (p=0.000 n=10+7) Java/medium-8 22.5MB ± 0% 17.1MB ± 0% -23.90% (p=0.000 n=10+10) Java/medium_imbalance-8 223MB ± 0% 33MB ± 0% -85.08% (p=0.000 n=10+10) Java/small-8 18.8MB ± 0% 13.7MB ± 0% -27.32% (p=0.000 n=10+10) Java/small_imbalance-8 147MB ± 0% 24MB ± 0% -83.67% (p=0.000 n=9+10) name old allocs/op new allocs/op delta Large-8 9.56k ± 0% 9.44k ± 0% -1.31% (p=0.000 n=9+10) LargeWithExisting-8 34.0k ± 0% 34.0k ± 1% -0.18% (p=0.002 n=9+10) LargeImbalanced-8 9.93k ± 0% 9.82k ± 1% -1.14% (p=0.000 n=9+10) LargeWithExistingImbalanced-8 33.8k ± 0% 33.8k ± 0% ~ (p=0.183 n=10+10) Java/large-8 1.04M ± 0% 1.04M ± 0% ~ (p=0.968 n=10+9) Java/large_imbalance-8 1.04M ± 0% 1.04M ± 0% -0.18% (p=0.000 n=10+8) Java/medium-8 56.1k ± 0% 56.1k ± 0% ~ (p=0.127 n=10+10) Java/medium_imbalance-8 56.1k ± 0% 56.1k ± 0% ~ (p=0.473 n=10+8) Java/small-8 44.9k ± 0% 44.9k ± 0% ~ (p=0.468 n=10+10) Java/small_imbalance-8 44.9k ± 0% 44.9k ± 0% -0.11% (p=0.000 n=8+10)
The performance comparison in jenkins for uniform subscription and non-equal subscription with the setting:
I think after this PR, the performance is acceptable for non-equal subscription cases. We can have incremental improvement in the following stories. Thank you. |
Failed tests are all flaky and unrelated. Thanks.
|
@guozhangwang @ableegoldman , PR is ready for review. Thank you. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor questions is do we want to preserve the exact logic of prepopulateCurrentAssignments
or not in this refactoring since I'm not sure if it is the case --- I'm fine if we do not really want to preserve that logic and just want to do it in a more efficient way, just bringing this up for clarification.
for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { | ||
for (int i = 0; i < entry.getValue(); ++i) | ||
partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>()); | ||
topic2AllPotentialConsumers.put(entry.getKey(), new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm is this right? Wouldn't we put the same empty list for the key N times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Updated. Thank you.
* @param toBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions | ||
* @return the partitions don't assign to any current consumers | ||
*/ | ||
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great!
@@ -362,23 +365,36 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, | |||
// otherwise (the consumer still exists) | |||
for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) { | |||
TopicPartition partition = partitionIter.next(); | |||
if (!partition2AllPotentialConsumers.containsKey(partition)) { | |||
if (!topic2AllPotentialConsumers.containsKey(partition.topic())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the following comment needs to be updated as well.
ownedPartitions.addAll(memberData.partitions); | ||
} else if (!memberData.generation.isPresent()) { | ||
// current maxGeneration is larger than DEFAULT_GENERATION, | ||
// put all partitions as DEFAULT_GENERATION into provAssignment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: prev.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure if the refactored code has the exactly same logic as the old code now since its branching conditions have largely changed. E.g. do we still detect if a partition is assigned to different consumers in a generation or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My refactor is just trying to reach the same currentAssignment
and prevAssignment
as before. So, if you meant this:
if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) {
// same partition is assigned to two consumers during the same rebalance.
// log a warning and skip this record
log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.",
partition, memberData.generation);
I think this check is unnecessary since we didn't do anything to it, and we cannot do anything to it, either. That's my thought. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
List<String> allTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet()); | ||
Collections.sort(allTopics, new TopicComparator(topic2AllPotentialConsumers)); | ||
|
||
// since allTopics are sorted, we can loop through allTopics to create the sortedPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
I'll have some refine to this PR. Please wait for a while . Thanks. |
@@ -40,6 +39,7 @@ | |||
private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class); | |||
|
|||
public static final int DEFAULT_GENERATION = -1; | |||
public int maxGeneration = DEFAULT_GENERATION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the maxGeneration
into class scope, so we can re-use it in prepopulateCurrentAssignments
.
&& subscribedTopics.containsAll(subscription.topics()))) { | ||
return false; | ||
isAllSubscriptionsEqual = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, we'll run through all the subscriptions
since the data consumerToOwnedPartitions
will also passed into generalAssign
int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); | ||
List<String> sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet()); | ||
Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers)); | ||
List<TopicPartition> sortedAllPartitions = getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse the getAllTopicPartitions
in constrainedAssign
* @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it | ||
* @return the partitions don't assign to any current consumers | ||
*/ | ||
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedAllPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the 2 getUnassignedPartitions
(this one and the following one) overloading method together for readability
if (memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) { | ||
// If the current member's generation is lower than maxGeneration, put into prevAssignment if needed | ||
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get()); | ||
} else if (!memberData.generation.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since now, we already have currentAssignment
from allSubscriptionsEqual
method, as well as the maxGeneration
data, so we can simplify the logic here for prevAssignment
only.
@ableegoldman , I've done the code refinement and refactor. Basically is what we've discussed in constrained Assignor PR. Please take a look when available. |
I saw there are cooperative sticky tests failed. I'll update it, and add more tests into it tomorrow or later. Thanks. |
Broken tests are fixed and new tests are added for multiple generation tests for unequal subscription cases. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not have further comments here, could @vahidhashemian (author of the sticky algorithm) or @ableegoldman lend me another pair of eyes before we proceed? Thanks.
It's on my list to review in the next couple of weeks, if not sooner. Sorry I have not had time to get to this one yet, but I will 🙂 (and I agree we should also get feedback from @vahidhashemian if he sees this) |
Thanks for tagging me @guozhangwang @ableegoldman. |
Thank you, guys! :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this detailed work and improvement. Left some initial comments.
|
||
// all partitions that needed to be assigned | ||
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); | ||
assignedPartitions = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this null
assignment needed? Don't see the variable used after this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it just tells the GC that this memory can be freed, to avoid OOM. I know in this step, we should already allocated all memories we need, but it's just in case. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is assuming the following balance()
call could run beyond the next GC?
In that case imho assignedPartitions.clear()
would look better (having almost the same impact).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, assignedPartitions.clear()
would have the same impact, but it'll loop through all the arrayList and nullify them one by one. I think we can either null
it, or remove this line. What do you think?
/**
* Removes all of the elements from this list. The list will
* be empty after this call returns.
*/
public void clear() {
modCount++;
final Object[] es = elementData;
for (int to = size, i = size = 0; i < to; i++)
es[i] = null;
}```
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
...ts/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
Show resolved
Hide resolved
@vahidhashemian , thanks for your comments. I've updated. Please take a look again. Thank you. |
|
||
// all partitions that needed to be assigned | ||
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); | ||
assignedPartitions = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is assuming the following balance()
call could run beyond the next GC?
In that case imho assignedPartitions.clear()
would look better (having almost the same impact).
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Show resolved
Hide resolved
if (subscription.userData() != null) { | ||
// since this is our 2nd time to deserialize memberData, rewind userData is necessary | ||
subscription.userData().rewind(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block didn't exist before, why is it needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a bug after constrainedAssign
implemented. After constrainedAssign
implemented, we'll do allSubscriptionsEqual
to decide if we want to use constrainedAssign
or generalAssign
. In allSubscriptionsEqual
, we not only check if subscription equal, but also deserialize the user data. So, if it is deserialized once, the position of userData (ByteBuffer) will be moved to the end of the buffer, so that we have to rewind here.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
@vahidhashemian , thanks for the comments. I've updated. Please take a look again. Thank you. |
Thanks for addressing my comments @showuon. I tested a couple of unit tests and saw the difference this change makes. |
@vahidhashemian , thank you for your review! :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a chance to run some tests against this pr and they all went fine. +1 from me. Thanks again for the improvements.
The failed tests are irrelevant to this PR, I'm merging to trunk now. |
Thank you @showuon !! |
I did code refactor/optimization, keep the same algorithm in this PR.
I've achieved:
We complete in 10 seconds, after my code refactor, the time down to 100~200 ms
No OutOfMemory will be thrown anymore. The time will take 4~5 seconds.
Committer Checklist (excluded from commit message)