Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12394: Return TOPIC_AUTHORIZATION_FAILED in delete topic response if no describe permission #10223

Merged
merged 3 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
MINOR: Do not expose topic name in DeleteTopic response if no descr…
…ibe permission
  • Loading branch information
hachikuji committed Feb 26, 2021
commit ef340d3932b4cff30a790befae10836c819d8bb4
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ class ControllerContext {
}.keySet
}

def topicName(topicId: Uuid): Option[String] = {
topicNames.get(topicId)
}

def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear()

def partitionWithLeadersCount: Int = partitionLeadershipInfo.size
Expand Down
34 changes: 19 additions & 15 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
results.add(new DeletableTopicResult()
.setName(name)
.setTopicId(topic.topicId()))
Expand All @@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala.filter(result => result.name() != null))(_.name)
results.forEach { topic =>
val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
topic.setErrorMessage("Topic IDs are not supported on the server.")
} else if (unresolvedTopicId)
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
else if (!authorizedDeleteTopics.contains(topic.name))
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
else if (!metadataCache.contains(topic.name))
topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
else
toDelete += topic.name
val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
topic.setErrorMessage("Topic IDs are not supported on the server.")
} else if (unresolvedTopicId) {
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
} else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
// Because the client does not have Describe permission, the name should
// not be returned in the response.
topic.setName(null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that the Name in the schema [1] used mapKey: true. I wonder if we should remove it now that Name is nullable. Usually, when we use mapKey, we expect to be able to query with the name and this is not always the case now. It seems that we don't rely on it in the admin client but we do in tests. What do you think?

[1] https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DeleteTopicsResponse.json#L41

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I agree using mapKey makes less sense now. Let me try it out. If the diff is not too bad, we can do it here. Otherwise, we can do it separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have one dependence here which makes this a little more work than I wanted to do here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1909. I filed a separate JIRA so that we don't forget about it: https://issues.apache.org/jira/browse/KAFKA-12395.

topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prepared to discuss this error code on #10184 (comment) :)

return UNKNOWN_TOPIC_ID

Could you add clear error message to this response (call setErrorMessage)? I can imagine users get confusing for the error code UNKNOWN_TOPIC_ID because it presents two scenarios now (the id does not exist or you have no permission to describe topic).

return TOPIC_AUTHORIZATION_FAILED

It indicates accurate error and it can help user handle un-authorized requests. Personally, this error is more suitable :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue with TOPIC_AUTHORIZATION_FAILED is that we are returning a different error message than the case where the topic ID does not exists and we are implying the existence of a topic when we should not be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the comment before fully realizing how topic IDs were complicating the matter. I filed this JIRA to discuss further: https://issues.apache.org/jira/browse/KAFKA-12394. I'd suggest that we keep the current behavior in this PR and fix the small issue. It would be good to have the tests in any case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 @jolshan I ended up doing this here after all since it sounds like there is consensus on not treating the topicId as sensitive based on the JIRA discussion. Let me know if you have any concerns.

} else if (!authorizedDeleteTopics.contains(topic.name)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to #10184 (comment), should it handle the case name provided, topic missing, describable => UNKNOWN_TOPIC_OR_PARTITION?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there are really two sub-cases. Here's how they are currently handled:

name provided, topic missing, describable, deletable => UNKNOWN_TOPIC_OR_PARTITION
name provided, topic missing, describable, undeletable => TOPIC_AUTHORIZATION_FAILED

This seems defensible to me. The UNKNOWN_TOPIC_OR_PARTITION error will cause the client to retry because of the possibility of stale metadata, but it can't delete the topic anyway because of the authorization failure. It seems better to fail fast?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems defensible to me. The UNKNOWN_TOPIC_OR_PARTITION error will cause the client to retry because of the possibility of stale metadata, but it can't delete the topic anyway because of the authorization failure. It seems better to fail fast?

That makes sense to me. We have to make sure this rule is applied to #10184 :)

topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
} else if (!metadataCache.contains(topic.name)) {
topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
} else {
toDelete += topic.name
}
}
// If no authorized topics return immediately
if (toDelete.isEmpty)
Expand Down
159 changes: 158 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.{Collections, Optional, Properties, Random}

import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, Partition}
import kafka.controller.KafkaController
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
Expand Down Expand Up @@ -75,6 +75,8 @@ import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.{ArgumentMatchers, Mockito}

import scala.annotation.nowarn
Expand Down Expand Up @@ -3479,6 +3481,161 @@ class KafkaApisTest {
assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList)
}

@Test
def testDeleteTopicsByIdAuthorization(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])

EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
EasyMock.anyObject(classOf[RequestChannel.Request]),
EasyMock.anyShort()
)).andReturn(UnboundedControllerMutationQuota)
EasyMock.expect(controller.isActive).andReturn(true)
EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)

// Try to delete three topics:
// 1. One without describe permission
// 2. One without delete permission
// 3. One which is authorized, but doesn't exist

expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
"foo" -> AuthorizationResult.DENIED,
"bar" -> AuthorizationResult.ALLOWED
))

expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
"foo" -> AuthorizationResult.DENIED,
"bar" -> AuthorizationResult.DENIED
))

val topicIdsMap = Map(
Uuid.randomUuid() -> Some("foo"),
Uuid.randomUuid() -> Some("bar"),
Uuid.randomUuid() -> None
)

topicIdsMap.foreach { case (topicId, topicNameOpt) =>
EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
}

val topicDatas = topicIdsMap.keys.map { topicId =>
new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
}.toList
val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
.setTopics(topicDatas.asJava))
.build(ApiKeys.DELETE_TOPICS.latestVersion)

val request = buildRequest(deleteRequest)
val capturedResponse = expectNoThrottling(request)

EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
requestChannel, txnCoordinator, controller, controllerContext, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)

val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]

topicIdsMap.foreach { case (topicId, nameOpt) =>
val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get
nameOpt match {
case Some("foo") =>
assertNull(response.name)
assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
case Some("bar") =>
assertEquals("bar", response.name)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode))
case None =>
assertNull(response.name)
assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
case _ =>
fail("Unexpected topic id/name mapping")
}
}
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])

EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
EasyMock.anyObject(classOf[RequestChannel.Request]),
EasyMock.anyShort()
)).andReturn(UnboundedControllerMutationQuota)
EasyMock.expect(controller.isActive).andReturn(true)

// Try to delete three topics:
// 1. One without describe permission
// 2. One without delete permission
// 3. One which is authorized, but doesn't exist

expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
"foo" -> AuthorizationResult.DENIED,
"bar" -> AuthorizationResult.ALLOWED,
"baz" -> AuthorizationResult.ALLOWED
))

expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
"foo" -> AuthorizationResult.DENIED,
"bar" -> AuthorizationResult.DENIED,
"baz" -> AuthorizationResult.ALLOWED
))

val deleteRequest = if (usePrimitiveTopicNameArray) {
new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
.setTopicNames(List("foo", "bar", "baz").asJava))
.build(5.toShort)
} else {
val topicDatas = List(
new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
)
new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
.setTopics(topicDatas.asJava))
.build(ApiKeys.DELETE_TOPICS.latestVersion)
}

val request = buildRequest(deleteRequest)
val capturedResponse = expectNoThrottling(request)

EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
requestChannel, txnCoordinator, controller, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)

val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]

def lookupErrorCode(topic: String): Option[Errors] = {
Option(deleteResponse.data.responses().find(topic))
.map(result => Errors.forCode(result.errorCode))
}

assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo"))
assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar"))
assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
}

def expectTopicAuthorization(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify this method using AuthHelperTest.matchSameElements().

Copy link
Contributor Author

@hachikuji hachikuji Mar 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at doing this, but the need to preserve order in the result makes it just as cumbersome. The capture also provides some type safety and avoids the need for casting from the argument list.

authorizer: Authorizer,
aclOperation: AclOperation,
topicResults: Map[String, AuthorizationResult]
): Unit = {
val expectedActions = topicResults.keys.map { topic =>
val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
topic -> new Action(aclOperation, pattern, 1, true, true)
}.toMap

val actionsCapture: Capture[util.List[Action]] = EasyMock.newCapture()
EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.capture(actionsCapture)))
.andAnswer(() => {
actionsCapture.getValue.asScala.map { action =>
val topic = action.resourcePattern.name
assertEquals(expectedActions(topic), action)
topicResults(topic)
}.asJava
})
.once()
}

private def createMockRequest(): RequestChannel.Request = {
val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request])
val requestHeader: RequestHeader = EasyMock.createNiceMock(classOf[RequestHeader])
Expand Down