Skip to content

Commit

Permalink
KAFKA-12598: ConfigCommand should only support communication via ZooK…
Browse files Browse the repository at this point in the history
…eeper for a reduced set of cases (apache#10811)

Checked the documentation, we must use `--zookeeper` option in 3 places (alter and describe):
1. user configs where the config is a SCRAM mechanism name (i.e. a SCRAM credential for a user)
2. update broker configs for a particular broker when that broker is down
3. broker default configs when all brokers are down

Reference:
1. [config SCRAM Credentials](https://kafka.apache.org/documentation/#security_sasl_scram_credentials)
2. [Update config before broker started](https://kafka.apache.org/documentation/#dynamicbrokerconfigs)

So, after this PR, we only support `--zookeeper` on `users` and `brokers` entity. Add some argument parse rules and tests. 

Reviewers: Ron Dagostino <[email protected]>, Ismael Juma <[email protected]>
  • Loading branch information
showuon authored Jul 20, 2021
1 parent 4423a54 commit b615904
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 99 deletions.
136 changes: 83 additions & 53 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,25 @@ import scala.collection._
* when describing or altering default configuration for users, clients, brokers, or ips, respectively.
* Alternatively, --user-defaults, --client-defaults, --broker-defaults, or --ip-defaults may be specified in place of
* --entity-type <users|clients|brokers|ips> --entity-default, respectively.
*
* For most use cases, this script communicates with a kafka cluster (specified via the
* `--bootstrap-server` option). There are three exceptions where direct communication with a
* ZooKeeper ensemble (specified via the `--zookeeper` option) is allowed:
*
* 1. Describe/alter user configs where the config is a SCRAM mechanism name (i.e. a SCRAM credential for a user)
* 2. Describe/alter broker configs for a particular broker when that broker is down
* 3. Describe/alter broker default configs when all brokers are down
*
* For example, this allows password configs to be stored encrypted in ZK before brokers are started,
* avoiding cleartext passwords in `server.properties`.
*/
object ConfigCommand extends Config {

val BrokerDefaultEntityName = ""
val BrokerLoggerConfigType = "broker-loggers"
val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
val ZkSupportedConfigTypes = ConfigType.all
val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)
val DefaultScramIterations = 4096
// Dynamic broker configs can only be updated using the new AdminClient once brokers have started
// so that configs may be fully validated. Prior to starting brokers, updates may be performed using
// ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted in ZK,
// avoiding clear passwords in server.properties. For consistency with older versions, quota-related
// broker configs can still be updated using ZooKeeper at any time. ConfigCommand will be migrated
// to the new AdminClient later for these configs (KIP-248).
val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set(
DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)

def main(args: Array[String]): Unit = {
try {
Expand Down Expand Up @@ -132,26 +133,24 @@ object ConfigCommand extends Config {
val entity = parseEntity(opts)
val entityType = entity.root.entityType
val entityName = entity.fullSanitizedName

if (entityType == ConfigType.User)
val errorMessage = s"--bootstrap-server option must be specified to update $entityType configs: {add: $configsToBeAdded, delete: $configsToBeDeleted}"

if (entityType == ConfigType.User) {
if (!configsToBeAdded.isEmpty || !configsToBeDeleted.isEmpty) {
val info = "User configuration updates using ZooKeeper are only supported for SCRAM credential updates."
val scramMechanismNames = ScramMechanism.values.map(_.mechanismName)
// make sure every added/deleted configs are SCRAM related, other configs are not supported using zookeeper
require(configsToBeAdded.stringPropertyNames.asScala.forall(scramMechanismNames.contains),
s"$errorMessage. $info")
require(configsToBeDeleted.forall(scramMechanismNames.contains), s"$errorMessage. $info")
}
preProcessScramCredentials(configsToBeAdded)
else if (entityType == ConfigType.Broker) {
// Replication quota configs may be updated using ZK at any time. Other dynamic broker configs
// may be updated using ZooKeeper only if the corresponding broker is not running. Dynamic broker
// configs at cluster-default level may be configured using ZK only if there are no brokers running.
val dynamicBrokerConfigs = configsToBeAdded.asScala.keySet.filterNot(BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning.contains)
if (dynamicBrokerConfigs.nonEmpty) {
} else if (entityType == ConfigType.Broker) {
// Dynamic broker configs can be updated using ZooKeeper only if the corresponding broker is not running.
if (!configsToBeAdded.isEmpty || !configsToBeDeleted.isEmpty) {
validateBrokersNotRunning(entityName, adminZkClient, zkClient, errorMessage)

val perBrokerConfig = entityName != ConfigEntityName.Default
val errorMessage = s"--bootstrap-server option must be specified to update broker configs $dynamicBrokerConfigs."
val info = "Broker configuration updates using ZooKeeper are supported for bootstrapping before brokers" +
" are started to enable encrypted password configs to be stored in ZooKeeper."
if (perBrokerConfig) {
adminZkClient.parseBroker(entityName).foreach { brokerId =>
require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage when broker $entityName is running. $info")
}
} else {
require(zkClient.getAllBrokersInCluster.isEmpty, s"$errorMessage for default cluster if any broker is running. $info")
}
preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig)
}
}
Expand All @@ -172,6 +171,22 @@ object ConfigCommand extends Config {
println(s"Completed updating config for entity: $entity.")
}

private def validateBrokersNotRunning(entityName: String,
adminZkClient: AdminZkClient,
zkClient: KafkaZkClient,
errorMessage: String): Unit = {
val perBrokerConfig = entityName != ConfigEntityName.Default
val info = "Broker configuration operations using ZooKeeper are only supported if the affected broker(s) are not running."
if (perBrokerConfig) {
adminZkClient.parseBroker(entityName).foreach { brokerId =>
require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage - broker $brokerId is running. $info")
}
} else {
val runningBrokersCount = zkClient.getAllBrokersInCluster.size
require(runningBrokersCount == 0, s"$errorMessage - $runningBrokersCount brokers are running. $info")
}
}

private def preProcessScramCredentials(configsToBeAdded: Properties): Unit = {
def scramCredential(mechanism: ScramMechanism, credentialStr: String): String = {
val pattern = "(?:iterations=([0-9]*),)?password=(.*)".r
Expand Down Expand Up @@ -236,9 +251,17 @@ object ConfigCommand extends Config {
}
}

private def describeConfigWithZk(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
private[admin] def describeConfigWithZk(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
val entityType = configEntity.root.entityType
val describeAllUsers = entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
val entityName = configEntity.fullSanitizedName
val errorMessage = s"--bootstrap-server option must be specified to describe $entityType"
if (entityType == ConfigType.Broker) {
// Dynamic broker configs can be described using ZooKeeper only if the corresponding broker is not running.
validateBrokersNotRunning(entityName, adminZkClient, zkClient, errorMessage)
}

val entities = configEntity.getAllEntities(zkClient)
for (entity <- entities) {
val configs = adminZkClient.fetchEntityConfig(entity.root.entityType, entity.fullSanitizedName)
Expand Down Expand Up @@ -733,10 +756,11 @@ object ConfigCommand extends Config {
class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {

val zkConnectOpt = parser.accepts("zookeeper", "DEPRECATED. The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over. Replaced by --bootstrap-server, REQUIRED unless --bootstrap-server is given.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
"Multiple URLS can be given to allow fail-over. Required when configuring SCRAM credentials for users or " +
"dynamic broker configs when the relevant broker(s) are down. Not allowed otherwise.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka server to connect to. " +
"This is required for describing and altering broker configs.")
.withRequiredArg
Expand All @@ -752,30 +776,30 @@ object ConfigCommand extends Config {
val allOpt = parser.accepts("all", "List all configs for the given topic, broker, or broker-logger entity (includes static configuration when the entity type is brokers)")

val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers/ips)")
.withRequiredArg
.ofType(classOf[String])
.withRequiredArg
.ofType(classOf[String])
val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id/ip)")
.withRequiredArg
.ofType(classOf[String])
.withRequiredArg
.ofType(classOf[String])
val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users/brokers/ips (applies to corresponding entity type in command line)")

val nl = System.getProperty("line.separator")
val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
"For entity-type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.User + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Ip + "': " + DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
"For entity-type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.User + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Ip + "': " + DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
val addConfigFile = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
.withRequiredArg
.ofType(classOf[String])
.withRequiredArg
.ofType(classOf[String])
val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
.withRequiredArg
.ofType(classOf[String])
.withValuesSeparatedBy(',')
.withRequiredArg
.ofType(classOf[String])
.withValuesSeparatedBy(',')
val forceOpt = parser.accepts("force", "Suppress console prompts")
val topic = parser.accepts("topic", "The topic's name.")
.withRequiredArg
Expand Down Expand Up @@ -855,7 +879,7 @@ object ConfigCommand extends Config {

entityTypeVals.foreach(entityTypeVal =>
if (!allowedEntityTypes.contains(entityTypeVal))
throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(",")} with the $connectOptString argument")
throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(", ")} with the $connectOptString argument")
)
if (entityTypeVals.isEmpty)
throw new IllegalArgumentException("At least one entity type must be specified")
Expand All @@ -878,6 +902,12 @@ object ConfigCommand extends Config {
throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all")
}

if (options.has(zkTlsConfigFile) && options.has(bootstrapServerOpt)) {
throw new IllegalArgumentException("--bootstrap-server doesn't support --zk-tls-config-file option. " +
"If you intend the command to communicate directly with ZooKeeper, please use the option --zookeeper instead of --bootstrap-server. " +
"Otherwise, remove the --zk-tls-config-file option.")
}

if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
try brokerId.toInt catch {
Expand Down
Loading

0 comments on commit b615904

Please sign in to comment.