Skip to content

Commit

Permalink
MINOR: Validate the KRaft controllerListener config on startup (apach…
Browse files Browse the repository at this point in the history
…e#11070)

Reviewers: Colin P. McCabe <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
niket-goel authored Jul 21, 2021
1 parent e0f8eda commit 6dd425e
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 2 deletions.
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.common.config.ConfigException

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -137,8 +138,13 @@ class ControllerServer(
credentialProvider,
apiVersionManager)
socketServer.startup(startProcessingRequests = false, controlPlaneListener = None, config.controllerListeners)
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
config.controllerListeners.head.listenerName))

if (config.controllerListeners.nonEmpty) {
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
config.controllerListeners.head.listenerName))
} else {
throw new ConfigException("No controller.listener.names defined for controller");
}

val configDefs = Map(ConfigResource.Type.BROKER -> KafkaConfig.configDef,
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy).asJava
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
" Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")

val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
val listenerNames = listeners.map(_.listenerName).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ class ConnectionQuotasTest {
val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
val props = brokerPropsWithDefaultConnectionLimits
props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
val config = KafkaConfig.fromProps(props)
connectionQuotas = new ConnectionQuotas(config, time, metrics)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ class SocketServerTest {
def testClientDisconnectionWithOutstandingReceivesProcessedUntilFailedSend(): Unit = {
val serverMetrics = new Metrics
@volatile var selector: TestableSelector = null
props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
val overrideServer = new SocketServer(
KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider, apiVersionManager
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class ControllerApisTest {
props: Properties = new Properties()): ControllerApis = {
props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
props.put(KafkaConfig.ProcessRolesProp, "controller")
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
new ControllerApis(
requestChannel,
authorizer,
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ class KafkaConfigTest {
assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol)
}

@Test
def testControllerListenerDefined(): Unit = {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "controller")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
props.put(KafkaConfig.NodeIdProp, "1")
assertFalse(isValidKafkaConfig(props))
val caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("controller.listener.names cannot be empty if the server has the controller role"))

props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
assertTrue(isValidKafkaConfig(props))
}

@Test
def testBadListenerProtocol(): Unit = {
val props = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class KafkaRaftServerTest {
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://127.0.0.1:9092")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")

val (loadedMetaProperties, offlineDirs) =
invokeLoadMetaProperties(metaProperties, configProperties)
Expand All @@ -58,6 +60,7 @@ class KafkaRaftServerTest {

configProperties.put(KafkaConfig.ProcessRolesProp, "controller")
configProperties.put(KafkaConfig.NodeIdProp, configNodeId.toString)
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")

assertThrows(classOf[InconsistentNodeIdException], () =>
invokeLoadMetaProperties(metaProperties, configProperties))
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class StorageToolTest {
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
properties.setProperty(KafkaConfig.NodeIdProp, "2")
properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
properties
}

Expand Down

0 comments on commit 6dd425e

Please sign in to comment.