Skip to content

Commit

Permalink
KAFKA-6981: Move the error handling configuration properties into the…
Browse files Browse the repository at this point in the history
… ConnectorConfig and SinkConnectorConfig classes (KIP-298)

Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes, and refactor the tests and classes to use these new properties.

Testing: Unit tests and running the connect-standalone script with a file sink connector.

Author: Arjun Satish <[email protected]>
Author: Randall Hauch <[email protected]>

Reviewers: Konstantine Karantasis <[email protected]>, Magesh Nandakumar <[email protected]>, Robert Yokota <[email protected]>, Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#5125 from wicknicks/KAFKA-6981
  • Loading branch information
wicknicks authored and ewencp committed Jun 5, 2018
1 parent ba0ebca commit faa15b8
Show file tree
Hide file tree
Showing 17 changed files with 284 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.Transformation;
Expand All @@ -36,8 +37,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

/**
Expand All @@ -54,6 +55,7 @@
public class ConnectorConfig extends AbstractConfig {
protected static final String COMMON_GROUP = "Common";
protected static final String TRANSFORMS_GROUP = "Transforms";
protected static final String ERROR_GROUP = "Error Handling";

public static final String NAME_CONFIG = "name";
private static final String NAME_DOC = "Globally unique name to use for this connector.";
Expand Down Expand Up @@ -106,6 +108,37 @@ public class ConnectorConfig extends AbstractConfig {
public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString();
public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString();

public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout";
public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors";
public static final int ERRORS_RETRY_TIMEOUT_DEFAULT = 0;
public static final String ERRORS_RETRY_TIMEOUT_DOC = "The maximum duration in milliseconds that a failed operation " +
"will be reattempted. The default is 0, which means no retries will be attempted. Use -1 for infinite retries.";

public static final String ERRORS_RETRY_MAX_DELAY_CONFIG = "errors.retry.delay.max.ms";
public static final String ERRORS_RETRY_MAX_DELAY_DISPLAY = "Maximum Delay Between Retries for Errors";
public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000;
public static final String ERRORS_RETRY_MAX_DELAY_DOC = "The maximum duration in milliseconds between consecutive retry attempts. " +
"Jitter will be added to the delay once this limit is reached to prevent thundering herd issues.";

public static final String ERRORS_TOLERANCE_CONFIG = "errors.allowed.max";
public static final String ERRORS_TOLERANCE_DISPLAY = "Error Tolerance";
public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE;
public static final String ERRORS_TOLERANCE_DOC = "Behavior for tolerating errors during connector operation. 'none' is the default value " +
"and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records.";

public static final String ERRORS_LOG_ENABLE_CONFIG = "errors.log.enable";
public static final String ERRORS_LOG_ENABLE_DISPLAY = "Log Errors";
public static final boolean ERRORS_LOG_ENABLE_DEFAULT = false;
public static final String ERRORS_LOG_ENABLE_DOC = "If true, write each error and the details of the failed operation and problematic record " +
"to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported.";

public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = "errors.log.include.messages";
public static final String ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY = "Log Error Details";
public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false;
public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to the include in the log the Connect record that resulted in " +
"a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, " +
"although some information such as topic and partition number will still be logged.";

private final EnrichedConnectorConfig enrichedConfig;
private static class EnrichedConnectorConfig extends AbstractConfig {
EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) {
Expand All @@ -120,6 +153,7 @@ public Object get(String key) {

public static ConfigDef configDef() {
int orderInGroup = 0;
int orderInErrorGroup = 0;
return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
Expand All @@ -138,7 +172,18 @@ public void ensureValid(String name, Object value) {
}), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
in(CONFIG_RELOAD_ACTION_NONE, CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY);
CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY)
.define(ERRORS_RETRY_TIMEOUT_CONFIG, Type.LONG, ERRORS_RETRY_TIMEOUT_DEFAULT, Importance.MEDIUM,
ERRORS_RETRY_TIMEOUT_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.MEDIUM, ERRORS_RETRY_TIMEOUT_DISPLAY)
.define(ERRORS_RETRY_MAX_DELAY_CONFIG, Type.LONG, ERRORS_RETRY_MAX_DELAY_DEFAULT, Importance.MEDIUM,
ERRORS_RETRY_MAX_DELAY_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.MEDIUM, ERRORS_RETRY_MAX_DELAY_DISPLAY)
.define(ERRORS_TOLERANCE_CONFIG, Type.STRING, ERRORS_TOLERANCE_DEFAULT.value(),
in(ToleranceType.NONE.value(), ToleranceType.ALL.value()), Importance.MEDIUM,
ERRORS_TOLERANCE_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_TOLERANCE_DISPLAY)
.define(ERRORS_LOG_ENABLE_CONFIG, Type.BOOLEAN, ERRORS_LOG_ENABLE_DEFAULT, Importance.MEDIUM,
ERRORS_LOG_ENABLE_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_ENABLE_DISPLAY)
.define(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, Type.BOOLEAN, ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT, Importance.MEDIUM,
ERRORS_LOG_INCLUDE_MESSAGES_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY);
}

public ConnectorConfig(Plugins plugins) {
Expand All @@ -162,6 +207,32 @@ public Object get(String key) {
return enrichedConfig.get(key);
}

public long errorRetryTimeout() {
return getLong(ERRORS_RETRY_TIMEOUT_CONFIG);
}

public long errorMaxDelayInMillis() {
return getLong(ERRORS_RETRY_MAX_DELAY_CONFIG);
}

public ToleranceType errorToleranceType() {
String tolerance = getString(ERRORS_TOLERANCE_CONFIG);
for (ToleranceType type: ToleranceType.values()) {
if (type.name().equalsIgnoreCase(tolerance)) {
return type;
}
}
return ERRORS_TOLERANCE_DEFAULT;
}

public boolean enableErrorLog() {
return getBoolean(ERRORS_LOG_ENABLE_CONFIG);
}

public boolean includeRecordDetailsInErrorLog() {
return getBoolean(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG);
}

/**
* Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -42,9 +43,19 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static final String TOPICS_REGEX_DEFAULT = "";
private static final String TOPICS_REGEX_DISPLAY = "Topics regex";

public static final String DLQ_PREFIX = "errors.deadletterqueue.";

public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to be used as the dead letter queue (DLQ) for messages that " +
"result in an error when processed by this sink connector, or its transformations or converters. The topic name is blank by default, " +
"which means that no messages are to be recorded in the DLQ.";
public static final String DLQ_TOPIC_DEFAULT = "";
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";

static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY);
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY);

public static ConfigDef configDef() {
return config;
Expand Down Expand Up @@ -83,4 +94,7 @@ public static boolean hasTopicsRegexConfig(Map<String, String> props) {
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
}

public String dlqTopicName() {
return getString(DLQ_TOPIC_NAME_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
ClassLoader loader) {
ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);

RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator();
retryWithToleranceOperator.configure(connConfig.originalsWithPrefix("errors."));
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
retryWithToleranceOperator.metrics(errorHandlingMetrics);

// Decide which type of worker task we need based on the type of task.
Expand All @@ -505,7 +505,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
time, retryWithToleranceOperator);
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
retryWithToleranceOperator.reporters(sinkTaskReporters(id, connConfig, errorHandlingMetrics));
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, headerConverter, transformationChain, loader, time,
retryWithToleranceOperator);
Expand All @@ -519,19 +520,17 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
return new ErrorHandlingMetrics(id, metrics);
}

private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig,
ErrorHandlingMetrics errorHandlingMetrics) {
private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig,
ErrorHandlingMetrics errorHandlingMetrics) {
ArrayList<ErrorReporter> reporters = new ArrayList<>();
LogReporter logReporter = new LogReporter(id);
logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
LogReporter logReporter = new LogReporter(id, connConfig);
logReporter.metrics(errorHandlingMetrics);
reporters.add(logReporter);

// check if topic for dead letter queue exists
String topic = connConfig.getString(DeadLetterQueueReporter.PREFIX + "." + DeadLetterQueueReporter.DLQ_TOPIC_NAME);
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
reporter.configure(connConfig.originalsWithPrefix(DeadLetterQueueReporter.PREFIX + "."));
reporters.add(reporter);
}

Expand All @@ -541,8 +540,7 @@ private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, ConnectorConfi
private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig,
ErrorHandlingMetrics errorHandlingMetrics) {
List<ErrorReporter> reporters = new ArrayList<>();
LogReporter logReporter = new LogReporter(id);
logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
LogReporter logReporter = new LogReporter(id, connConfig);
logReporter.metrics(errorHandlingMetrics);
reporters.add(logReporter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,22 +47,14 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;

public static final String PREFIX = "errors.deadletterqueue";
private final SinkConnectorConfig connConfig;

public static final String DLQ_TOPIC_NAME = "topic.name";
public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic where these messages are written to.";
public static final String DLQ_TOPIC_DEFAULT = "";

private DeadLetterQueueReporterConfig config;
private KafkaProducer<byte[], byte[]> kafkaProducer;
private ErrorHandlingMetrics errorHandlingMetrics;

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC);

public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
ConnectorConfig connConfig, Map<String, Object> producerProps) {
String topic = connConfig.getString(PREFIX + "." + DLQ_TOPIC_NAME);
SinkConnectorConfig connConfig, Map<String, Object> producerProps) {
String topic = connConfig.dlqTopicName();

try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
if (!admin.listTopics().names().get().contains(topic)) {
Expand All @@ -81,7 +71,7 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
}

KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
return new DeadLetterQueueReporter(dlqProducer);
return new DeadLetterQueueReporter(dlqProducer, connConfig);
}

/**
Expand All @@ -90,13 +80,9 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
* @param kafkaProducer a Kafka Producer to produce the original consumed records.
*/
// Visible for testing
DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer) {
DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig) {
this.kafkaProducer = kafkaProducer;
}

@Override
public void configure(Map<String, ?> configs) {
config = new DeadLetterQueueReporterConfig(configs);
this.connConfig = connConfig;
}

@Override
Expand All @@ -110,7 +96,8 @@ public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
* @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
*/
public void report(ProcessingContext context) {
if (config.topic().isEmpty()) {
final String dlqTopicName = connConfig.dlqTopicName();
if (dlqTopicName.isEmpty()) {
return;
}

Expand All @@ -124,31 +111,18 @@ public void report(ProcessingContext context) {

ProducerRecord<byte[], byte[]> producerRecord;
if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
producerRecord = new ProducerRecord<>(config.topic(), null,
producerRecord = new ProducerRecord<>(dlqTopicName, null,
originalMessage.key(), originalMessage.value(), originalMessage.headers());
} else {
producerRecord = new ProducerRecord<>(config.topic(), null, originalMessage.timestamp(),
producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.timestamp(),
originalMessage.key(), originalMessage.value(), originalMessage.headers());
}

this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("Could not produce message to dead letter queue. topic=" + config.topic(), exception);
log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
}
});
}

static class DeadLetterQueueReporterConfig extends AbstractConfig {
public DeadLetterQueueReporterConfig(Map<?, ?> originals) {
super(CONFIG_DEF, originals, true);
}

/**
* @return name of the dead letter queue topic.
*/
public String topic() {
return getString(DLQ_TOPIC_NAME);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
*/
package org.apache.kafka.connect.runtime.errors;

import org.apache.kafka.common.Configurable;

/**
* Report an error using the information contained in the {@link ProcessingContext}.
*/
public interface ErrorReporter extends Configurable {
public interface ErrorReporter {

/**
* Report an error.
Expand Down
Loading

0 comments on commit faa15b8

Please sign in to comment.