Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15544: Enable integration tests for new consumer #14758

Merged
merged 13 commits into from
Nov 28, 2023

Conversation

AndrewJSchofield
Copy link
Collaborator

This PR parameterizes the consumer integration tests so they can be run against
the existing "generic" group protocol and the new "consumer" group protocol
introduced in KIP-848.

The KIP-848 client code is under construction so some of the tests do not run on
both variants to start with, but the idea is that the tests can be enabled as the gaps
in functionality are closed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Merge commits from trunk
@AndrewJSchofield AndrewJSchofield marked this pull request as draft November 14, 2023 17:02
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @AndrewJSchofield!

Comment on lines 1672 to 1833
def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = {
val metricName = broker.metrics.metricName("throttle-time",
quotaType.toString,
"",
"user", "",
"client-id", clientId)
assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName)
def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = {
val metricName = broker.metrics.metricName("throttle-time",
quotaType.toString,
"",
"user", "",
"client-id", clientId)
assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName)
}
servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId))
servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId))
brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId))
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId))
brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId))
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId))

servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId))
servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId))
brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId))
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId))
brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId))
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is incorrect in trunk, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this context, servers and brokers are interchangeable. This change makes the test work for ZK or KRaft. Previously, it was ZK-only.

@@ -66,6 +66,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest {


override protected def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true") // enable KIP-848 group coordinator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we need to test that if the broker's configuration has this set to false that the client won't up and explode. That's a future task...

Comment on lines 1 to 70
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api

import kafka.utils.TestUtils.waitUntilTrue
import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.Test

import java.time.Duration
import scala.jdk.CollectionConverters._

class BaseAsyncConsumerTest extends AbstractConsumerTest {
val defaultBlockingAPITimeoutMs = 1000

@Test
def testCommitAsync(): Unit = {
val consumer = createAsyncConsumer()
val producer = createProducer()
val numRecords = 10000
val startingTimestamp = System.currentTimeMillis()
val cb = new CountConsumerCommitCallback
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
consumer.assign(List(tp).asJava)
consumer.commitAsync(cb)
waitUntilTrue(() => {
cb.successCount == 1
}, "wait until commit is completed successfully", defaultBlockingAPITimeoutMs)
val committedOffset = consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs))
assertNotNull(committedOffset)
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to
// tp. The committed offset should be null. This is intentional.
assertNull(committedOffset.get(tp))
assertTrue(consumer.assignment.contains(tp))
}

@Test
def testCommitSync(): Unit = {
val consumer = createAsyncConsumer()
val producer = createProducer()
val numRecords = 10000
val startingTimestamp = System.currentTimeMillis()
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
consumer.assign(List(tp).asJava)
consumer.commitSync()
val committedOffset = consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs))
assertNotNull(committedOffset)
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to
// tp. The committed offset should be null. This is intentional.
assertNull(committedOffset.get(tp))
assertTrue(consumer.assignment.contains(tp))
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good riddance!

@Test
def testSimpleConsumption(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("generic", "consumer"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a different type of 'source' that can be defined once vs. on each test? More of a nit, but curious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but I want to be able to have a different array for each test to enable them to be turned on individually.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been replaced with a MethodSource that is capable of returning whatever combination we want.

Comment on lines -1811 to +1957
val partition = 0;
val partition = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ain't Java, come on!

Comment on lines 1815 to 1882
TestUtils.waitUntilTrue(() => {
this.zkClient.topicExists(topic)
}, "Failed to create topic")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need this now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced we ever did. Again, this is ZK-specific. The tests work on both variants without this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @AndrewJSchofield . Method createTopics() already verifies that the topic was created.

@kirktrue
Copy link
Collaborator

@philipnee can you tag with ctr and KIP-848 🥺

@AndrewJSchofield AndrewJSchofield marked this pull request as ready for review November 20, 2023 16:58
@AndrewJSchofield AndrewJSchofield marked this pull request as draft November 21, 2023 09:05
@AndrewJSchofield
Copy link
Collaborator Author

Converting back to draft. This PR depends on #14801 being merged. This is ready for review, but will not build cleanly until the other PR lands.

@kirktrue
Copy link
Collaborator

@cadonna / @lucasbru—can we add the ctr label to this PR?

@cadonna cadonna added the ctr Consumer Threading Refactor (KIP-848) label Nov 21, 2023
@cadonna
Copy link
Contributor

cadonna commented Nov 21, 2023

Done!

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @AndrewJSchofield !

Here my comments.

@Test
def testAutoCommitOnCloseAfterWakeup(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("generic")) // close() is not committing offsets in consumer group protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a transient state, isn't it? At least https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close is planned. If KAFKA-15327 is still valid, can we formulate this comment accordingly like close() is not committing offsets in consumer group protocol for now but it should when implementation is complete.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the idea, but I will use a less wordy formulation such as "temporarily". I expect a bunch of these will be resolved in the next two weeks and we can run most of the tests across both consumers.

@Test
def testPartitionsForInvalidTopic(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above about the functionality being supported in future.

@Test
def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the ConsumerRebalanceListener is also something that will be supported in the future by the consumer group protocol.

@Test
def testMultiConsumerSessionTimeoutOnClose(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@Test
def testInterceptors(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("generic")) // Consumer interceptors not implemented for consumer group protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will also interceptors be supported?

Comment on lines +2103 to +2147
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to
// tp. The committed offset should be null. This is intentional.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this clear in the name of the test instead of using a inline comment?

Comment on lines +2123 to +2167
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to
// tp. The committed offset should be null. This is intentional.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above

}

@ParameterizedTest
@ValueSource(strings = Array("generic")) // partitionsFor not implemented in consumer group protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments about functionality that will be supported in the future.

def testConsumingWithEmptyGroupId(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("consumer")) // Null group ID not supported with consumer group protocol
def testNullGroupIdNotSupported(groupProtocol: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name suggests that a group ID of null is not supported in general with manual assignment. However, null is supported as group ID with manual assignment. Just commits are not supported with group ID = null. Could you please use a more descriptive name for this test like testNullGroupIdNotSupportedIfCommitting?


// Check committed offsets twice with same consumer
assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to consumer and commit some records before retrieving the committed offsets a second time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't write this test and there's no comment by the author. I suppose this was done specifically to catch a problem and I prefer not to mess around with it when I don't understand.

@dajac
Copy link
Contributor

dajac commented Nov 22, 2023

@AndrewJSchofield We are about to merge #14781. Is it possible to build this one on top of it?

@AndrewJSchofield
Copy link
Collaborator Author

@dajac I suggest you simply merge #14871 and let me deal with the fall-out. This one is not going to be far behind.

@dajac
Copy link
Contributor

dajac commented Nov 23, 2023

I just merged #14781. Thanks!

@AndrewJSchofield
Copy link
Collaborator Author

This PR now reflects the changes in KAFKA-14781 and also tests the new consumer.

val numRecords = 10000
val producer = createProducer()
val startingTimestamp = System.currentTimeMillis()
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)

this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this change to all the tests is a bit annoying. Have you considered adding this to IntegrationTestHarness.doSetup or in createConsumer? We could infer it like we did with isNewGroupCoordinatorEnabled() in the same class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider that. I have already endured the annoyance. I'll take a look.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this line now?

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala 2.12 build needs to be fixed, and I left some minor comments. But generally this looks good to me

val numRecords = 10000
val producer = createProducer()
val startingTimestamp = System.currentTimeMillis()
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)

this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this line now?

@lucasbru
Copy link
Member

CI is blocked by a hanging test bug that is going to celebrate its fourth birthday soon: https://issues.apache.org/jira/browse/KAFKA-9470. I restarted CI and submitted #14855

@lucasbru
Copy link
Member

I created 3 new flaky test tickets, but all failed tests were flaky before on master.

@lucasbru lucasbru merged commit 161b94d into apache:trunk Nov 28, 2023
1 check failed
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-15544 branch December 3, 2023 22:13
ex172000 pushed a commit to ex172000/kafka that referenced this pull request Dec 15, 2023
This commit parameterizes the consumer integration tests so they can be run against
the existing "generic" group protocol and the new "consumer" group protocol
introduced in KIP-848.

The KIP-848 client code is under construction so some of the tests do not run on
both variants to start with, but the idea is that the tests can be enabled as the gaps
in functionality are closed.

Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
This commit parameterizes the consumer integration tests so they can be run against
the existing "generic" group protocol and the new "consumer" group protocol
introduced in KIP-848.

The KIP-848 client code is under construction so some of the tests do not run on
both variants to start with, but the idea is that the tests can be enabled as the gaps
in functionality are closed.

Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
This commit parameterizes the consumer integration tests so they can be run against
the existing "generic" group protocol and the new "consumer" group protocol
introduced in KIP-848.

The KIP-848 client code is under construction so some of the tests do not run on
both variants to start with, but the idea is that the tests can be enabled as the gaps
in functionality are closed.

Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
This commit parameterizes the consumer integration tests so they can be run against
the existing "generic" group protocol and the new "consumer" group protocol
introduced in KIP-848.

The KIP-848 client code is under construction so some of the tests do not run on
both variants to start with, but the idea is that the tests can be enabled as the gaps
in functionality are closed.

Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ctr Consumer Threading Refactor (KIP-848)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants