-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15544: Enable integration tests for new consumer #14758
Conversation
Merge commits from trunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @AndrewJSchofield!
def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = { | ||
val metricName = broker.metrics.metricName("throttle-time", | ||
quotaType.toString, | ||
"", | ||
"user", "", | ||
"client-id", clientId) | ||
assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) | ||
def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = { | ||
val metricName = broker.metrics.metricName("throttle-time", | ||
quotaType.toString, | ||
"", | ||
"user", "", | ||
"client-id", clientId) | ||
assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) | ||
} | ||
servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) | ||
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) | ||
servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) | ||
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) | ||
brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) | ||
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) | ||
brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) | ||
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) | ||
|
||
servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) | ||
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) | ||
servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) | ||
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) | ||
brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) | ||
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) | ||
brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) | ||
brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is incorrect in trunk
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this context, servers
and brokers
are interchangeable. This change makes the test work for ZK or KRaft. Previously, it was ZK-only.
@@ -66,6 +66,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest { | |||
|
|||
|
|||
override protected def brokerPropertyOverrides(properties: Properties): Unit = { | |||
properties.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true") // enable KIP-848 group coordinator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point we need to test that if the broker's configuration has this set to false
that the client won't up and explode. That's a future task...
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package kafka.api | ||
|
||
import kafka.utils.TestUtils.waitUntilTrue | ||
import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull, assertTrue} | ||
import org.junit.jupiter.api.Test | ||
|
||
import java.time.Duration | ||
import scala.jdk.CollectionConverters._ | ||
|
||
class BaseAsyncConsumerTest extends AbstractConsumerTest { | ||
val defaultBlockingAPITimeoutMs = 1000 | ||
|
||
@Test | ||
def testCommitAsync(): Unit = { | ||
val consumer = createAsyncConsumer() | ||
val producer = createProducer() | ||
val numRecords = 10000 | ||
val startingTimestamp = System.currentTimeMillis() | ||
val cb = new CountConsumerCommitCallback | ||
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) | ||
consumer.assign(List(tp).asJava) | ||
consumer.commitAsync(cb) | ||
waitUntilTrue(() => { | ||
cb.successCount == 1 | ||
}, "wait until commit is completed successfully", defaultBlockingAPITimeoutMs) | ||
val committedOffset = consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs)) | ||
assertNotNull(committedOffset) | ||
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to | ||
// tp. The committed offset should be null. This is intentional. | ||
assertNull(committedOffset.get(tp)) | ||
assertTrue(consumer.assignment.contains(tp)) | ||
} | ||
|
||
@Test | ||
def testCommitSync(): Unit = { | ||
val consumer = createAsyncConsumer() | ||
val producer = createProducer() | ||
val numRecords = 10000 | ||
val startingTimestamp = System.currentTimeMillis() | ||
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) | ||
consumer.assign(List(tp).asJava) | ||
consumer.commitSync() | ||
val committedOffset = consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs)) | ||
assertNotNull(committedOffset) | ||
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to | ||
// tp. The committed offset should be null. This is intentional. | ||
assertNull(committedOffset.get(tp)) | ||
assertTrue(consumer.assignment.contains(tp)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good riddance!
@Test | ||
def testSimpleConsumption(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic", "consumer")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have a different type of 'source' that can be defined once vs. on each test? More of a nit, but curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, but I want to be able to have a different array for each test to enable them to be turned on individually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been replaced with a MethodSource
that is capable of returning whatever combination we want.
val partition = 0; | ||
val partition = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ain't Java, come on!
TestUtils.waitUntilTrue(() => { | ||
this.zkClient.topicExists(topic) | ||
}, "Failed to create topic") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced we ever did. Again, this is ZK-specific. The tests work on both variants without this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @AndrewJSchofield . Method createTopics()
already verifies that the topic was created.
@philipnee can you tag with |
2cec36d
to
951e05d
Compare
Converting back to draft. This PR depends on #14801 being merged. This is ready for review, but will not build cleanly until the other PR lands. |
951e05d
to
54068f5
Compare
Done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @AndrewJSchofield !
Here my comments.
@Test | ||
def testAutoCommitOnCloseAfterWakeup(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic")) // close() is not committing offsets in consumer group protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a transient state, isn't it? At least https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close is planned. If KAFKA-15327 is still valid, can we formulate this comment accordingly like close() is not committing offsets in consumer group protocol for now but it should when implementation is complete
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the idea, but I will use a less wordy formulation such as "temporarily". I expect a bunch of these will be resolved in the next two weeks and we can run most of the tests across both consumers.
@Test | ||
def testPartitionsForInvalidTopic(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above about the functionality being supported in future.
@Test | ||
def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the ConsumerRebalanceListener
is also something that will be supported in the future by the consumer group protocol.
@Test | ||
def testMultiConsumerSessionTimeoutOnClose(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
@Test | ||
def testInterceptors(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic")) // Consumer interceptors not implemented for consumer group protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will also interceptors be supported?
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to | ||
// tp. The committed offset should be null. This is intentional. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make this clear in the name of the test instead of using a inline comment?
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to | ||
// tp. The committed offset should be null. This is intentional. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(strings = Array("generic")) // partitionsFor not implemented in consumer group protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comments about functionality that will be supported in the future.
def testConsumingWithEmptyGroupId(): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("consumer")) // Null group ID not supported with consumer group protocol | ||
def testNullGroupIdNotSupported(groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name suggests that a group ID of null
is not supported in general with manual assignment. However, null
is supported as group ID with manual assignment. Just commits are not supported with group ID = null
. Could you please use a more descriptive name for this test like testNullGroupIdNotSupportedIfCommitting
?
|
||
// Check committed offsets twice with same consumer | ||
assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) | ||
assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to consumer and commit some records before retrieving the committed offsets a second time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't write this test and there's no comment by the author. I suppose this was done specifically to catch a problem and I prefer not to mess around with it when I don't understand.
@AndrewJSchofield We are about to merge #14781. Is it possible to build this one on top of it? |
I just merged #14781. Thanks! |
54068f5
to
4468494
Compare
This PR now reflects the changes in KAFKA-14781 and also tests the new consumer. |
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
Outdated
Show resolved
Hide resolved
val numRecords = 10000 | ||
val producer = createProducer() | ||
val startingTimestamp = System.currentTimeMillis() | ||
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) | ||
|
||
this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this change to all the tests is a bit annoying. Have you considered adding this to IntegrationTestHarness.doSetup
or in createConsumer
? We could infer it like we did with isNewGroupCoordinatorEnabled()
in the same class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did consider that. I have already endured the annoyance. I'll take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this line now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala 2.12 build needs to be fixed, and I left some minor comments. But generally this looks good to me
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
Outdated
Show resolved
Hide resolved
val numRecords = 10000 | ||
val producer = createProducer() | ||
val startingTimestamp = System.currentTimeMillis() | ||
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) | ||
|
||
this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this line now?
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
Outdated
Show resolved
Hide resolved
CI is blocked by a hanging test bug that is going to celebrate its fourth birthday soon: https://issues.apache.org/jira/browse/KAFKA-9470. I restarted CI and submitted #14855 |
I created 3 new flaky test tickets, but all failed tests were flaky before on master. |
This commit parameterizes the consumer integration tests so they can be run against the existing "generic" group protocol and the new "consumer" group protocol introduced in KIP-848. The KIP-848 client code is under construction so some of the tests do not run on both variants to start with, but the idea is that the tests can be enabled as the gaps in functionality are closed. Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
This commit parameterizes the consumer integration tests so they can be run against the existing "generic" group protocol and the new "consumer" group protocol introduced in KIP-848. The KIP-848 client code is under construction so some of the tests do not run on both variants to start with, but the idea is that the tests can be enabled as the gaps in functionality are closed. Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
This commit parameterizes the consumer integration tests so they can be run against the existing "generic" group protocol and the new "consumer" group protocol introduced in KIP-848. The KIP-848 client code is under construction so some of the tests do not run on both variants to start with, but the idea is that the tests can be enabled as the gaps in functionality are closed. Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
This commit parameterizes the consumer integration tests so they can be run against the existing "generic" group protocol and the new "consumer" group protocol introduced in KIP-848. The KIP-848 client code is under construction so some of the tests do not run on both variants to start with, but the idea is that the tests can be enabled as the gaps in functionality are closed. Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
This PR parameterizes the consumer integration tests so they can be run against
the existing "generic" group protocol and the new "consumer" group protocol
introduced in KIP-848.
The KIP-848 client code is under construction so some of the tests do not run on
both variants to start with, but the idea is that the tests can be enabled as the gaps
in functionality are closed.
Committer Checklist (excluded from commit message)