Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 #10450

Merged
Prev Previous commit
Next Next commit
Move AuthorizerUtils.validateAclBinding to AclAuthorizer
Also convert a `toList` to `toBuffer`
  • Loading branch information
ijuma committed Mar 31, 2021
commit b93ed2edb82699bbfc9617ebbc1ab63c0ec73a97
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}

import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._
Expand Down Expand Up @@ -118,9 +117,16 @@ object AclAuthorizer {
zkClientConfig
}
}

private def validateAclBinding(aclBinding: AclBinding): Unit = {
if (aclBinding.isUnknown)
throw new IllegalArgumentException("ACL binding contains unknown elements")
}
}

class AclAuthorizer extends Authorizer with Logging {
import kafka.security.authorizer.AclAuthorizer._

private[security] val authorizerLogger = Logger("kafka.authorizer.logger")
private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false
Expand Down Expand Up @@ -200,7 +206,7 @@ class AclAuthorizer extends Authorizer with Logging {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
}
AuthorizerUtils.validateAclBinding(aclBinding)
validateAclBinding(aclBinding)
true
} catch {
case e: Throwable =>
Expand All @@ -225,7 +231,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
}
}
results.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
results.toBuffer.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.security.authorizer
import java.net.InetAddress

import kafka.network.RequestChannel.Session
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
Expand All @@ -29,11 +28,6 @@ import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorize

object AuthorizerUtils {


def validateAclBinding(aclBinding: AclBinding): Unit = {
if (aclBinding.isUnknown)
throw new IllegalArgumentException("ACL binding contains unknown elements")
}
def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])

def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
Expand Down