Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6054: Add 'version probing' to Kafka Streams rebalance #4636

Merged
merged 19 commits into from
May 31, 2018

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Mar 2, 2018

No description provided.

@mjsax mjsax added the streams label Mar 2, 2018
@mjsax mjsax requested review from dguy and guozhangwang March 2, 2018 07:39
@mjsax
Copy link
Member Author

mjsax commented Mar 2, 2018

@bbejeck @vvcephei

This is a WIP PR that also includes all changes from #4630 -- thus, must eventually be rebased. I added a new config just for now to test the approach -- this might change after the KIP is done, but I wanted to get a initial passing system test setup (the system test also contains some code that is put in comment, that allows to fail the test -- just FYI -- this part needs cleanup as well).

I also needed to update the Docker setup to get the older jar files we need to run the test locally using docker.

We could backport this fix to 0.10.1, 0.10.2, 0.11.0, 1.0, and 1.1, too. But cherry-picking won't work, as too much code change in-between. Basically, the fix contains, setting the correct min-version in the assignment and a config that tells bouncing instances to stay on protocol version 1. So it's quite a change -- not sure if it's worth it, considering that not many people will use 0.10.0 anyway anymore (Having said this, it might not even be required to fix the JIRA at all -- nevertheless, this PR gives an idea for the upcoming version change and how to do a system test for it).

Please share your thoughts.

@mjsax mjsax force-pushed the kafka-6054-fix-upgrade-system-test branch 2 times, most recently from becd580 to af39ef9 Compare March 6, 2018 01:42
@mjsax
Copy link
Member Author

mjsax commented Mar 6, 2018

Rebased this and cleanup the code. This is a proper fix for the issue including system test. Still depends on a KIP that is WIP.

Triggered 10 runs of the new system test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1387/

@@ -196,6 +197,12 @@ public void configure(final Map<String, ?> configs) {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());

final String upgradeMode = streamsConfig.getString(StreamsConfig.UPGRADE_MODE_CONFIG);
if (StreamsConfig.UPGRADE_FROM_0100X.equals(upgradeMode)) {
log.debug("Downgrading to metadata version 1 for upgrade from 0.10.0.x.");
Copy link
Contributor

@bbejeck bbejeck Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since upgrading won't be a common occurrence should this be log.info? Just a thought I don't have a strong opinion either way.

EDIT: Thinking some more it's fine as debug, ignore my previous comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way -- put as debug because it's usually not helpful information for the user.

"collect_default": True},
"streams_stderr.1-6": {
"path": STDERR_FILE + ".1-6",
"collect_default": True},
Copy link
Contributor

@bbejeck bbejeck Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all the log configs? I'm guessing separate ones for each version upgrade?

EDIT: Should have looked further down, each one is for each roll then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In line https://github.com/apache/kafka/pull/4636/files#diff-21e906e43e313a665d07a8e1cd61a9c3R330 we move files around to roll them over -- it's super helpful for debugging to know which file belongs to what phase in the test.

'SmokeTest-sum-STATE-STORE-0000000026-changelog' : { 'partitions': self.partitions,
'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'SmokeTest-cnt-STATE-STORE-0000000035-changelog' : { 'partitions': self.partitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just playing devil's advocate here, do we want to create the internal topics ahead of time? As if we change the SmokeTest streams application at we'll need to update these again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to. We use the StreamsEosTestDriverService that expects a replication factor of 3, however, when we start up the 0.10.0.x version, it would by default create those topic with replication factor 1 and thus the test crashes later on.

Note, that we pull in the 0.10.0.1 code as-is and cannot update the Streams app there, to change the replication factor config to 3.

@bbejeck
Copy link
Contributor

bbejeck commented Mar 9, 2018

@mjsax thanks for the work on this and just one minor comment.

Like you said above, we can't merge the changes to the pre-KIP-182 versions, but maybe it's worthwhile to cherry-pick the code changes to the SmokeTest streams app to the 0.11.0 branches forward, as we'll be removing the old API code soon, so a rolling upgrade system test simulates a user going from old API to new.

@mjsax
Copy link
Member Author

mjsax commented Mar 9, 2018

@bbejeck -- for old versions, we only pull in officially release artifacts. Thus, as long as there is not bug fix release for older versions, back porting does not help :(

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change generally makes sense to me. Per the config names there are some comment on the KIP itself which we can continue discussing there.

@@ -317,6 +325,10 @@ public Subscription subscription(final Set<String> topics) {
clientMetadata.addConsumer(consumerId, info);
}

if (minUserMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
log.debug("Downgrading metadata to version " + minUserMetadataVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better log the latest supported version as well.

@mjsax mjsax force-pushed the kafka-6054-fix-upgrade-system-test branch from af39ef9 to 8f00806 Compare March 12, 2018 00:11
@mjsax
Copy link
Member Author

mjsax commented Mar 12, 2018

Updated this.

Also had a look into the system tests. One run failed. The issue is, that it can happen that one instance only gets repartition topics assigned. For for this case, we never get expected output "Processes 100 record from topic=" and the test fails as it thinks the application did not start up. This issue is part of 0.10.0.1 code and thus we cannot easily change this.

I think, there are the following options:

  • weaken the test setup and put a sleep() instead
  • change the test setup and try to get committed offsets to see if an instance started processing
  • change the test setup to grep the log instead of stdout
  • change the test setup to use "new client code" that links to old jar file (atm, the main jar files and test jar files are "linked" together -- we could put code in-place that allows us to write a 0.10.0 application that has it's code in trunk but still link to 0.10.0 lib in the system test.

The latest options would give us most control. Might be the most difficult to implement though. However, if we want to get more test like this, it might be worth doing it. Atm, being stuck with old client code if we want to run a older Streams version that we cannot change seems to be quite some limitation. Decoupling the test code from the library jars would be valuable.

WDYT?

@bbejeck
Copy link
Contributor

bbejeck commented Mar 12, 2018

@mjsax I'm very much in favor of the last proposal and decoupling the library jars from the test code. I recently ran into the same difficulty with writing a rolling upgrade test against all versions and had to make some trade-offs because we can't change any test code.

@guozhangwang
Copy link
Contributor

I'm also in favor of the last approach as well. But for this specific failure case, after we have implemented that approach, we still need to figure out how to modify the client code used in system test suite, if we want to go further in this direction such as (re-)compiling from non-trunk code for some modules before the test, we likely need to modify ducktape as well. These should theoretically all be doable but I'd let us to figure out all the details before dig into it.

@@ -42,6 +44,132 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
"streams_log.0": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these more log files intentional or for debugging only?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both. Originally, I added this for debugging. But I think it's super useful to keep them. Otherwise, it's hard to inspect log/stdout/stderr files if there are multiple processors that get bounced (it's hard to tell from a single file, which statements belong to which "group generation" -- rolling over the files make debugging much easier.) Thus, I would suggest to keep this change and also apply it to other system test with similar pattern.

@vvcephei
Copy link
Contributor

Sorry for the radio silence.

For the record, I also think it's a great idea to keep a copy of SmokeTest for each version that we want to test. It will free us up to alter the test scenario, amd simplify the test orchestration. I think we also wouldn't need to download the test artifacts anymore also, which saves a little testing time.

@@ -340,6 +345,10 @@
public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead";

/** {@code upgrade.from} */
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
public static final String UPGRADE_FROM_DOC = "Enables a backward compatible rebalance mode for the first round of rolling bounces. Default is null. Accepted values are \"" + UPGRADE_FROM_0100X + "\" (for upgrading from 0.10.0.x).";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be specific on the scope of this config. For exmaple:

Allows versions equal or older than 1.1.0 to upgrade to newer versions including 1.2.0 in a backward compatible way. When upgrading from other versions this config would never need to be specified. Default is null ...

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you ready to start collecting "final" reviews? If so, it looks good to me.

@mjsax
Copy link
Member Author

mjsax commented Mar 26, 2018

This PR is a little outdated -- I am going to do a new PR for trunk that will add the new config upgrade.mode including system tests etc similar to the other once. Afterwards, I plan to clean this PR up, to implement "version probing" etc.

@mjsax mjsax force-pushed the kafka-6054-fix-upgrade-system-test branch from 69a851d to c7bedb3 Compare April 7, 2018 19:51
@mjsax mjsax changed the title [WIP] KAFKA-6054: Fix Kafka Streams upgrade path for v0.10.0 KAFKA-6054: Fix Kafka Streams upgrade path for v0.10.0 Apr 7, 2018
@mjsax
Copy link
Member Author

mjsax commented Apr 7, 2018

Updated this to add system test for 1.1 release, increase metadata to version 3, add version probing, and version probing system test.

System test passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1671/

This should be the last PR to complete KIP-268.

Call for review @guozhangwang @bbejeck @vvcephei @dguy

@mjsax mjsax force-pushed the kafka-6054-fix-upgrade-system-test branch from d44014d to 6a077ec Compare May 25, 2018 17:33
@mjsax
Copy link
Member Author

mjsax commented May 25, 2018

Update this.

Couple of notes what is changed:

  • in version probing case, leader does not send empty assignment back to everybody (because this would trigger closing of tasks that we try to avoid) but send empty assignment only to the instance that issue the probe request
  • in version probing case, leader does not assign "missing tasks" -- the instance that get's upgraded, does not have any tasks assigned and sends and empty "prev task" map -- this would lead to task migration from upgrading instance to other instances. Instead, on version probing, the leader send the exact old assignment back to non-upgrading instances, and might not assign some task (empty assignment for upgrading instance). This prevents task migration that we want to prevent, because the upgrading instance will trigger a second rebalance and in this second rebalance we can assign all tasks avoid and task migration.
  • in version probing case, if the leader is already upgraded, it does encode a downgraded "supported version" instead of it's actually supported version in the general case -- this prevents already upgraded instances to upgrade their metatdata to the new version and thus, as long as at least one old instance is there, all instances use the old metadata format. Only if all instances support the new version, the leader encode it's latest supported version. This triggers metadata upgrade. This happen when the last instance is bounced -- additionally, when the bounced instance trigger a second rebalance (because version probing was detected), all instanced automatically upgrade to the new protocol. Hence, we don't need the leader to trigger an additional rebalance but the the upgrade "for free"
  • updated system test accordingly. note, that we always need to know who the leader is and that a "forceRebalance()" can trigger a leader change

Also did some additional code cleanup.

@mjsax
Copy link
Member Author

mjsax commented May 25, 2018

Triggered system test (50 runs): https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1758/

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a pass, overall looks good, just have a couple of questions. The system test looks good, but I might need another look later.

@@ -145,13 +145,15 @@
*/
// TODO: currently we cannot get the full topic configurations and hence cannot allow topic configs without the prefix,
// this can be lifted once kafka.log.LogConfig is completely deprecated by org.apache.kafka.common.config.TopicConfig
@SuppressWarnings("WeakerAccess")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a question more for my information, why do we need this SuppressWarnings here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly. However, Intellij shows a warning that those could be private because it's never used. It's just to get rid of those warnings. Can also revert if you insist.

if (streamThread.versionProbingFlag.get()) {
streamThread.versionProbingFlag.set(false);
} else {
taskManager.suspendTasksAndState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something and brought this up before, but above in onPartitionsAssigned we create tasks with the assignment when not version probing. But in onPartitionsRevoked if we are version probing we flip the version probing flag, hence on assignment we create tasks. Why don't we flip the version probing flag in onPartitionedAssigned as an else statement on line 270 so we are only every suspending and creating tasks during non-version probing rebalances?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onPartitionsAssigned: if version probing flag is set, it means assignment is empty and we want to trigger a new rebalance. If we call taskManager.createTasks(assignment);, we would close suspended task and that is what we do not want to do at this point, because we hope to get those task assigned after the second rebalance.

onPartitionsRevoked: if version probing flag is set, we don't want to suspend tasks either. Tasks are already suspended but if we call taskManager.suspendTasksAndState(); again, we loose the information about currently suspended tasks (but we need to keep this information; ie, we avoid an incorrect internal metadata update here).

The flow is the following:

  • trigger first rebalance
  • onPartitionsRevoke -> version probing flag not set: suspend tasks regularly
  • onPartitionAssigned -> version probing flag set by StreamsPartitionsAssignor: we skip task creation as we will rebalance again (we cannot reset the flag here, because we need it in the next step)
  • trigger second rebalance
  • onPartitionsRevoke -> version probing flag is still set; we can reset the flag and skip suspending tasks to preserve metadata
  • onPartitionAssigned -> version probing flag not set: we do regular assignment and start processing

Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep - thanks for the clarification

@@ -204,15 +220,15 @@ public void configure(final Map<String, ?> configs) {
switch (upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
userMetadataVersion = 1;
usedSubscriptionMetadataVersion = VERSION_ONE;
break;
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use "{}.x." vs. string concatenation

@@ -52,6 +52,33 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
"streams_log.1": {
"path": LOG_FILE + ".1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the system test results still get errors when these files aren't found?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to specify all used files here -- otherwise they won't be collected after the test finished.

found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
if len(found) == self.leader_counter[p] + 1:
self.leader = p
self.leader_counter[p] = self.leader_counter[p] + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the + 1? Is that for the leader to kick off version probing with a future version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a processor is the leader, it will print Finished assignment for group in the log. A processor can be the leader multiple times, and thus we count how many of those lines we have seen in leader_counter[p]. To identify the new leader, it's count must increase by exactly one. We increase by one as we have found one occurrence as expected.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass on non-testing code as I've reviewed the testing code before. Left some comments.

private final static int VERSION_THREE = 3;
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
protected int minUserMetadataVersion = UNKNOWN;
protected Set<Integer> supportedVersions = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is only used for testing purposes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, yes. However, after we bump metadata version to 4, parts of the logic of FutureStreamsPartitionAssignor will move into StreamsPartitionAssignor and than also StreamsPartitionAssignor need this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@@ -602,6 +674,51 @@ public Subscription subscription(final Set<String> topics) {

return assignment;
}
private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line space.

for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
for (final String consumerId : clientMetadata.consumers) {

final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could skip if futureConsumers.contains(consumerId)?

if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion
&& receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {

if (info.version() == supportedVersion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info.version() could be replaced with receivedAssignmentMetadataVersion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my reasoning of the cases:

  1. receivedAssignmentMetadataVersion > supportedVersion: this should never happen.
  2. receivedAssignmentMetadataVersion == supportedVersion: normal case, the leader only knows up to supportedVersion, and hence sends this version back.
  3. receivedAssignmentMetadataVersion < supportedVersion: if some other consumer used an even older-than-supportedVersion, in this case this consumer will again send the subscription with supportedVersion.

So it seems we do not need to distinguish 2) and 3) since for either case, line 763 and line 770 will actually assign usedSubscriptionMetadataVersion = supportedVersion right?

Or do you just want to distinguish the log entry? If that's the case I think simplifying this to:

if (receivedAssignmentMetadataVersion > supportedVersion) {
   // throw runtime exception
} else {
   if (receivedAssignmentMetadataVersion == supportedVersion)
       // log normally
    else
       // log differently

    usedSubscriptionMetadataVersion = supportedVersion;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think unifying the assignment to usedSubscriptionMetadataVersion into a single line is harder to read for humans.
Using

usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion

make clear that it's a downgrade while

usedSubscriptionMetadataVersion = supportedVersion

makes clear it is an upgrade.

I apply the other suggestions, thought.

final int supportedVersion = info.latestSupportedVersion();

if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion
&& receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION should be guaranteed at the server side as always right? If that is true, I'd suggest we refactor it as:

if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion) {
    if (receivedAssignmentMetadataVersion < EARLIEST_PROBEABLE_VERSION) {
        // throw illegal state exception.
    }

    // .. below logic
}

So that we can detect potential bugs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is actually correct. If I change a suggested, it break for the manual upgrade path from version 1/2 to version 3. For this case, a received version can be smaller than the used-subscription-version, and smaller than EARLIEST_PROBEABLE_VERSION.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right. My understanding is that to manual upgrade from version 1/2 to version 3, we set upgrade.from config accordingly, so first rebalance everyone use version 1/2 in subscriptionInfo and AssignmentInfo; then in second rebalance someone send subscriptionInfo with version 3, and someone send with version 1/2 (they have not bounced yet), so assignmentInfo with version 1/2 are sent back again.

processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
case 3:
case VERSION_THREE:
final int latestSupportedVersionGroupLeader = info.latestSupportedVersion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reuse supportedVersion in line 753 above.

Actually how about renaming that field to latestLeaderSupportedVersion?

}

private ClientState(Set<TaskId> activeTasks, Set<TaskId> standbyTasks, Set<TaskId> assignedTasks, Set<TaskId> prevActiveTasks, Set<TaskId> prevAssignedTasks, int capacity) {
private ClientState(final Set<TaskId> activeTasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added prevStandbyTasks seems not set anywhere? I.e. it will always be empty hashset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set in

public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
    prevStandbyTasks.addAll(standbyTasks);
    prevAssignedTasks.addAll(standbyTasks);
}

@guozhangwang
Copy link
Contributor

@mjsax Please feel free to merge after those comments are addressed. I have no more feedbacks.

@mjsax
Copy link
Member Author

mjsax commented May 31, 2018

Retest this please.

@mjsax
Copy link
Member Author

mjsax commented May 31, 2018

@mjsax mjsax merged commit d166485 into apache:trunk May 31, 2018
@mjsax mjsax deleted the kafka-6054-fix-upgrade-system-test branch June 5, 2018 23:48
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
This PR fixes some regressions introduced into streams system tests and sets the upgrade tests to ignore until PR apache#4636 is merged as it has the fixes for the upgrade tests.

Reviewers: Guozhang Wang <[email protected]>
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants