Skip to content

Commit

Permalink
STORM-3123: Changes to return extra properties from KafkaSpout and us…
Browse files Browse the repository at this point in the history
…e it in TopologySpoutLag

Change-Id: Id6e3ce120cc813adbe611d085cd4bc3ebd0ff590
  • Loading branch information
arunmahadevan committed Nov 13, 2018
1 parent 40e24ce commit fa0b862
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,11 +692,28 @@ public Map<String, Object> getComponentConfiguration() {
configuration.put(configKeyPrefix + "topics", getTopicsString());

configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
}
}
return configuration;
}

private boolean isPrimitiveOrWrapper(Class<?> type) {
if (type == null) {
return false;
}
return type.isPrimitive() || isWrapper(type);
}

private boolean isWrapper(Class<?> type) {
return type == Double.class || type == Float.class || type == Long.class
|| type == Integer.class || type == Short.class || type == Character.class
|| type == Byte.class || type == Boolean.class || type == String.class;
}


private String getTopicsString() {
return kafkaSpoutConfig.getTopicFilter().getTopicsString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ private static Options buildOptions() {
"consumer/spout e.g. hostname1:9092,hostname2:9092");
options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer");
options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful "
+ "when connecting to secure kafka");
options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " +
"Kafka consumer properties");
return options;
}

Expand All @@ -136,10 +136,10 @@ public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
// Read Kafka property file for extra security options
if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
}
}
// Read property file for extra consumer properties
if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
}
List<TopicPartition> topicPartitionList = new ArrayList<>();
consumer = new KafkaConsumer<>(props);
Expand Down
60 changes: 52 additions & 8 deletions storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
package org.apache.storm.utils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
Expand All @@ -41,6 +47,9 @@ public class TopologySpoutLag {
private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
private static final String SECURITY_PROTOCOL_CONFIG = CONFIG_KEY_PREFIX + "security.protocol";
private static final Set<String> ALL_CONFIGS = new HashSet<>(Arrays.asList(TOPICS_CONFIG, GROUPID_CONFIG,
BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG));
private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);

public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map<String, Object> topologyConf) {
Expand Down Expand Up @@ -68,14 +77,38 @@ private static List<String> getCommandLineOptionsForNewKafkaSpout(Map<String, Ob
commands.add((String) jsonConf.get(GROUPID_CONFIG));
commands.add("-b");
commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG);
if (securityProtocol != null && !securityProtocol.isEmpty()) {
commands.add("-s");
commands.add(securityProtocol);
}
return commands;
}

private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
File file = null;
Map<String, String> extraProperties = new HashMap<>();
for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey())) {
extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()), conf.getValue().toString());
}
}
if (!extraProperties.isEmpty()) {
try {
file = File.createTempFile("kafka-consumer-extra", "props");
file.deleteOnExit();
Properties properties = new Properties();
properties.putAll(extraProperties);
try(FileOutputStream fos = new FileOutputStream(file)) {
properties.store(fos, "Kafka consumer extra properties");
}
} catch (IOException ex) {
// ignore
}
}
return file;
}

private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec)
throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
Expand Down Expand Up @@ -116,18 +149,29 @@ private static Map<String, Object> getLagResultForKafka(String spoutId, SpoutSpe
}
commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));

File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
if (extraPropertiesFile != null) {
commands.add("-c");
commands.add(extraPropertiesFile.getAbsolutePath());
}
logger.debug("Command to run: {}", commands);

// if commands contains one or more null value, spout is compiled with lower version of storm-kafka-client
if (!commands.contains(null)) {
String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));

try {
result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
} catch (ParseException e) {
logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));

try {
result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
} catch (ParseException e) {
logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
}
} finally {
if (extraPropertiesFile != null) {
extraPropertiesFile.delete();
}
}
}
}
Expand Down

0 comments on commit fa0b862

Please sign in to comment.