Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12675: improve the sticky general assignor scalability and performance #10552

Merged
merged 6 commits into from
Jun 2, 2021

Conversation

showuon
Copy link
Contributor

@showuon showuon commented Apr 17, 2021

I did code refactor/optimization, keep the same algorithm in this PR.
I've achieved:

  1. Originally, With this setting:
topicCount = 50;
partitionCount = 800;
consumerCount = 800;

We complete in 10 seconds, after my code refactor, the time down to 100~200 ms

  1. With the 1 million partitions setting:
topicCount = 500;
partitionCount = 2000;
consumerCount = 2000;

No OutOfMemory will be thrown anymore. The time will take 4~5 seconds.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
TopicPartition topicPartition = new TopicPartition(topic, i);
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
Copy link
Contributor Author

@showuon showuon Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 1:
We used to have 2 map consumer2AllPotentialPartitions and partition2AllPotentialConsumers. But that would need a lot of memory here, ex: consumer2AllPotentialPartitions will need 2000 map, and each map contains 1M partitions (suppose 1 million partition and 2000 consumers). But actually, we only need to store the topics of each potential partitions/consumers, and mapped with partitionsPerTopic. so I changed to topic2AllPotentialConsumers and consumer2AllPotentialTopics. Save memory and save time.

* @param toBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions
* @return the partitions don't assign to any current consumers
*/
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions,
Copy link
Contributor Author

@showuon showuon Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 2:
We used to have an ArrayList of unassignedPartitions, with all sorted partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, the ArrayList element remove is pretty slow for huge size because it needs to find element first, and then, do arrayCopy for the removed array with size of (originalSize -1). This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement.

To refactor it, I used two pointer technique to loop through 2 sorted list: sortedPartitions and sortedToBeRemovedPartitions. And only add the difference set of the 2 lists. The looping and element adding is very fast in ArrayList. So, it improves a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great!

Collections.sort(toBeRemovedPartitions, new PartitionComparator(topic2AllPotentialConsumers));
unassignedPartitions = getUnassignedPartitions(sortedPartitions, toBeRemovedPartitions);
} else {
unassignedPartitions = sortedPartitions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use unassignedPartitions and sortedPartitions as the base list, so make them refer to the same list to save memory when brand-new assignment.

private void prepopulateCurrentAssignments(Map<String, Subscription> subscriptions,
Map<String, List<TopicPartition>> currentAssignment,
Map<TopicPartition, ConsumerGenerationPair> prevAssignment) {
// we need to process subscriptions' user data with each consumer's reported generation in mind
// higher generations overwrite lower generations in case of a conflict
// note that a conflict could exists only if user data is for different generations

// for each partition we create a sorted map of its consumers by generation
Map<TopicPartition, TreeMap<Integer, String>> sortedPartitionConsumersByGeneration = new HashMap<>();
Copy link
Contributor Author

@showuon showuon Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 3:
We used to have a sortedPartitionConsumersByGeneration map to store all partitions with all generation/consumer, to compute the currentAssignment and prevAssignment. It takes many memory and slow down the calculation. Improve it by computing the currentAssignment and prevAssignment while looping the subscriptions list (referred to the allSubscriptionsEqual method :)) .

List<String> allTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet());
Collections.sort(allTopics, new TopicComparator(topic2AllPotentialConsumers));

// since allTopics are sorted, we can loop through allTopics to create the sortedPartitions
Copy link
Contributor Author

@showuon showuon Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor 4: To have sortPartitions list, we used to sort all of the partitions. To improve it, I sort all topics first(only 500 topics to sort, compared to the original 1 million partitions to sort), and then add the partitions by looping all sorted topics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

}
for (int i = 0; i < consumerCount; i++) {
if (i == consumerCount - 1) {
subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics.subList(0, 1)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe to only 1 topic for the last consumer

@showuon
Copy link
Contributor Author

showuon commented Apr 17, 2021

@ableegoldman , please help review this PR. Thank you.

twmb added a commit to twmb/franz-go that referenced this pull request Apr 17, 2021
As @showuon pointed out in github.com/apache/kafka/pull/10552,
tracking partitionPotentials (in Java, partition2AllPotentialConsumers)
is a huge waste of memory when we only need to know potential topic
consumers. If we knock that out, we knock out the most expensive
allocation as well as a lot of hot looping.

Also, if the members are consistently heap sorted by the least loaded
member, then assigning parttions gets much faster.

Lastly, we can knock out more allocations by getting rid of partNum.
This does unfortunately slow things down in the complex graph case, but
that only happened in one benchmark. Overall it may be a wash.

name                           old time/op    new time/op    delta
Large-8                          9.32ms ± 1%    4.95ms ± 1%   -46.89%  (p=0.000 n=10+10)
LargeWithExisting-8              15.7ms ± 1%    12.8ms ± 1%   -18.66%  (p=0.000 n=10+10)
LargeImbalanced-8                25.7ms ±27%   144.7ms ±14%  +462.35%  (p=0.000 n=10+10)
LargeWithExistingImbalanced-8    15.7ms ± 1%    12.7ms ± 1%   -19.38%  (p=0.000 n=10+10)
Java/large-8                      2.63s ± 1%     0.40s ± 2%   -84.72%  (p=0.000 n=10+10)
Java/large_imbalance-8            13.4s ± 5%      0.5s ± 3%   -96.62%  (p=0.000 n=9+10)
Java/medium-8                    70.9ms ± 1%    17.9ms ± 1%   -74.73%  (p=0.000 n=10+9)
Java/medium_imbalance-8           216ms ± 1%      22ms ± 1%   -89.78%  (p=0.000 n=10+9)
Java/small-8                     49.3ms ± 0%    14.4ms ± 1%   -70.79%  (p=0.000 n=10+10)
Java/small_imbalance-8            149ms ± 0%      17ms ± 1%   -88.46%  (p=0.000 n=9+10)

name                           old alloc/op   new alloc/op   delta
Large-8                          7.12MB ± 0%    4.43MB ± 0%   -37.73%  (p=0.000 n=10+10)
LargeWithExisting-8              9.60MB ± 0%    6.94MB ± 0%   -27.71%  (p=0.000 n=9+9)
LargeImbalanced-8                17.0MB ± 0%     4.7MB ± 1%   -72.09%  (p=0.000 n=9+10)
LargeWithExistingImbalanced-8    9.60MB ± 0%    7.00MB ± 0%   -27.13%  (p=0.000 n=10+9)
Java/large-8                      531MB ± 0%     441MB ± 0%   -17.09%  (p=0.000 n=10+9)
Java/large_imbalance-8           8.54GB ± 0%    0.50GB ± 0%   -94.10%  (p=0.000 n=10+7)
Java/medium-8                    22.5MB ± 0%    17.1MB ± 0%   -23.90%  (p=0.000 n=10+10)
Java/medium_imbalance-8           223MB ± 0%      33MB ± 0%   -85.08%  (p=0.000 n=10+10)
Java/small-8                     18.8MB ± 0%    13.7MB ± 0%   -27.32%  (p=0.000 n=10+10)
Java/small_imbalance-8            147MB ± 0%      24MB ± 0%   -83.67%  (p=0.000 n=9+10)

name                           old allocs/op  new allocs/op  delta
Large-8                           9.56k ± 0%     9.44k ± 0%    -1.31%  (p=0.000 n=9+10)
LargeWithExisting-8               34.0k ± 0%     34.0k ± 1%    -0.18%  (p=0.002 n=9+10)
LargeImbalanced-8                 9.93k ± 0%     9.82k ± 1%    -1.14%  (p=0.000 n=9+10)
LargeWithExistingImbalanced-8     33.8k ± 0%     33.8k ± 0%      ~     (p=0.183 n=10+10)
Java/large-8                      1.04M ± 0%     1.04M ± 0%      ~     (p=0.968 n=10+9)
Java/large_imbalance-8            1.04M ± 0%     1.04M ± 0%    -0.18%  (p=0.000 n=10+8)
Java/medium-8                     56.1k ± 0%     56.1k ± 0%      ~     (p=0.127 n=10+10)
Java/medium_imbalance-8           56.1k ± 0%     56.1k ± 0%      ~     (p=0.473 n=10+8)
Java/small-8                      44.9k ± 0%     44.9k ± 0%      ~     (p=0.468 n=10+10)
Java/small_imbalance-8            44.9k ± 0%     44.9k ± 0%    -0.11%  (p=0.000 n=8+10)
@showuon
Copy link
Contributor Author

showuon commented Apr 19, 2021

The performance comparison in jenkins for uniform subscription and non-equal subscription with the setting:

topicCount = 500;
partitionCount = 2000;
consumerCount = 2000;
Build / JDK 15 and Scala 2.13 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 13 sec | Passed
Build / JDK 11 and Scala 2.13 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 17 sec | Passed
Build / JDK 8 and Scala 2.12 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 14 sec | Passed

Build / JDK 8 and Scala 2.12 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.4 sec | Passed
Build / JDK 15 and Scala 2.13 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.3 sec | Passed
Build / JDK 11 and Scala 2.13 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.9 sec | Passed

I think after this PR, the performance is acceptable for non-equal subscription cases. We can have incremental improvement in the following stories. Thank you.

@showuon
Copy link
Contributor Author

showuon commented Apr 19, 2021

Failed tests are all flaky and unrelated. Thanks.
The fix to flaky MirrorConnectorsIntegration tests issue is in my another PR: #10547
The root cause of flaky RaftClusterTest tests will be addressed in KAFKA-12677.

    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testAddingWorker
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker
    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
    Build / JDK 11 and Scala 2.13 / kafka.server.ControllerMutationQuotaTest.testStrictDeleteTopicsRequest()
    Build / JDK 11 and Scala 2.13 / kafka.server.ReplicationQuotasTest.shouldThrottleOldSegments()
    Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithNotAllowedOverridesForPrincipalPolicy
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
    Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()

@showuon
Copy link
Contributor Author

showuon commented Apr 20, 2021

@guozhangwang @ableegoldman , PR is ready for review. Thank you. :)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor questions is do we want to preserve the exact logic of prepopulateCurrentAssignments or not in this refactoring since I'm not sure if it is the case --- I'm fine if we do not really want to preserve that logic and just want to do it in a more efficient way, just bringing this up for clarification.

for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
for (int i = 0; i < entry.getValue(); ++i)
partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>());
topic2AllPotentialConsumers.put(entry.getKey(), new ArrayList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this right? Wouldn't we put the same empty list for the key N times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Updated. Thank you.

* @param toBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions
* @return the partitions don't assign to any current consumers
*/
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great!

@@ -362,23 +365,36 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
// otherwise (the consumer still exists)
for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {
TopicPartition partition = partitionIter.next();
if (!partition2AllPotentialConsumers.containsKey(partition)) {
if (!topic2AllPotentialConsumers.containsKey(partition.topic())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the following comment needs to be updated as well.

ownedPartitions.addAll(memberData.partitions);
} else if (!memberData.generation.isPresent()) {
// current maxGeneration is larger than DEFAULT_GENERATION,
// put all partitions as DEFAULT_GENERATION into provAssignment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: prev.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure if the refactored code has the exactly same logic as the old code now since its branching conditions have largely changed. E.g. do we still detect if a partition is assigned to different consumers in a generation or not?

Copy link
Contributor Author

@showuon showuon May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My refactor is just trying to reach the same currentAssignment and prevAssignment as before. So, if you meant this:

if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) {
    // same partition is assigned to two consumers during the same rebalance.
    // log a warning and skip this record
    log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.",
        partition, memberData.generation);

I think this check is unnecessary since we didn't do anything to it, and we cannot do anything to it, either. That's my thought. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

List<String> allTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet());
Collections.sort(allTopics, new TopicComparator(topic2AllPotentialConsumers));

// since allTopics are sorted, we can loop through allTopics to create the sortedPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

@showuon
Copy link
Contributor Author

showuon commented May 10, 2021

I'll have some refine to this PR. Please wait for a while . Thanks.

@@ -40,6 +39,7 @@
private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class);

public static final int DEFAULT_GENERATION = -1;
public int maxGeneration = DEFAULT_GENERATION;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the maxGeneration into class scope, so we can re-use it in prepopulateCurrentAssignments.

&& subscribedTopics.containsAll(subscription.topics()))) {
return false;
isAllSubscriptionsEqual = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, we'll run through all the subscriptions since the data consumerToOwnedPartitions will also passed into generalAssign

int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
List<String> sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet());
Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers));
List<TopicPartition> sortedAllPartitions = getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse the getAllTopicPartitions in constrainedAssign

* @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it
* @return the partitions don't assign to any current consumers
*/
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedAllPartitions,
Copy link
Contributor Author

@showuon showuon May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the 2 getUnassignedPartitions (this one and the following one) overloading method together for readability

if (memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
// If the current member's generation is lower than maxGeneration, put into prevAssignment if needed
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get());
} else if (!memberData.generation.isPresent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now, we already have currentAssignment from allSubscriptionsEqual method, as well as the maxGeneration data, so we can simplify the logic here for prevAssignment only.

@showuon
Copy link
Contributor Author

showuon commented May 11, 2021

@ableegoldman , I've done the code refinement and refactor. Basically is what we've discussed in constrained Assignor PR. Please take a look when available.
cc @guozhangwang , welcome to take another look.
Thank you!

@showuon
Copy link
Contributor Author

showuon commented May 11, 2021

I saw there are cooperative sticky tests failed. I'll update it, and add more tests into it tomorrow or later. Thanks.

@showuon
Copy link
Contributor Author

showuon commented May 18, 2021

Broken tests are fixed and new tests are added for multiple generation tests for unequal subscription cases. Thanks.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have further comments here, could @vahidhashemian (author of the sticky algorithm) or @ableegoldman lend me another pair of eyes before we proceed? Thanks.

@ableegoldman
Copy link
Contributor

It's on my list to review in the next couple of weeks, if not sooner. Sorry I have not had time to get to this one yet, but I will 🙂 (and I agree we should also get feedback from @vahidhashemian if he sees this)

@vahidhashemian
Copy link
Contributor

Thanks for tagging me @guozhangwang @ableegoldman.
I'll try to review this within the next few days.

@showuon
Copy link
Contributor Author

showuon commented May 21, 2021

Thank you, guys! :)

Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this detailed work and improvement. Left some initial comments.


// all partitions that needed to be assigned
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers);
assignedPartitions = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this null assignment needed? Don't see the variable used after this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it just tells the GC that this memory can be freed, to avoid OOM. I know in this step, we should already allocated all memories we need, but it's just in case. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is assuming the following balance() call could run beyond the next GC?
In that case imho assignedPartitions.clear() would look better (having almost the same impact).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, assignedPartitions.clear() would have the same impact, but it'll loop through all the arrayList and nullify them one by one. I think we can either null it, or remove this line. What do you think?

/**
     * Removes all of the elements from this list.  The list will
     * be empty after this call returns.
     */
    public void clear() {
        modCount++;
        final Object[] es = elementData;
        for (int to = size, i = size = 0; i < to; i++)
            es[i] = null;
    }```

@showuon
Copy link
Contributor Author

showuon commented May 24, 2021

@vahidhashemian , thanks for your comments. I've updated. Please take a look again. Thank you.


// all partitions that needed to be assigned
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers);
assignedPartitions = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is assuming the following balance() call could run beyond the next GC?
In that case imho assignedPartitions.clear() would look better (having almost the same impact).

Comment on lines +581 to 584
if (subscription.userData() != null) {
// since this is our 2nd time to deserialize memberData, rewind userData is necessary
subscription.userData().rewind();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block didn't exist before, why is it needed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a bug after constrainedAssign implemented. After constrainedAssign implemented, we'll do allSubscriptionsEqual to decide if we want to use constrainedAssign or generalAssign. In allSubscriptionsEqual, we not only check if subscription equal, but also deserialize the user data. So, if it is deserialized once, the position of userData (ByteBuffer) will be moved to the end of the buffer, so that we have to rewind here.

@showuon
Copy link
Contributor Author

showuon commented May 25, 2021

@vahidhashemian , thanks for the comments. I've updated. Please take a look again. Thank you.

@vahidhashemian
Copy link
Contributor

Thanks for addressing my comments @showuon. I tested a couple of unit tests and saw the difference this change makes.
I have no further comment at this time. Given this is a big change I'd wait for @ableegoldman's review before approval. In the meantime, I may test some scenarios for additional validation.

@showuon
Copy link
Contributor Author

showuon commented May 26, 2021

@vahidhashemian , thank you for your review! :)

Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a chance to run some tests against this pr and they all went fine. +1 from me. Thanks again for the improvements.

@guozhangwang
Copy link
Contributor

The failed tests are irrelevant to this PR, I'm merging to trunk now.

@guozhangwang guozhangwang merged commit 6db51e4 into apache:trunk Jun 2, 2021
@guozhangwang
Copy link
Contributor

Thank you @showuon !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants