Skip to content

Commit

Permalink
KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (a…
Browse files Browse the repository at this point in the history
…pache#5270)

Co-authored-by: Sumant Tambe <[email protected]>
Co-authored-by: Yu Yang <[email protected]>

Reviewers: Ted Yu <[email protected]>, Apurva Mehta <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
yuyang08 authored and hachikuji committed Jul 26, 2018
1 parent 1d9a427 commit 7fc7136
Show file tree
Hide file tree
Showing 22 changed files with 900 additions and 431 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@
*/
package org.apache.kafka.clients.producer;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
Expand All @@ -24,6 +35,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
Expand Down Expand Up @@ -69,18 +81,6 @@
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;

/**
Expand Down Expand Up @@ -235,6 +235,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";

private final String clientId;
// Visible for testing
Expand Down Expand Up @@ -392,18 +393,21 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
int retries = configureRetries(config, transactionManager != null, log);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
short acks = configureAcks(config, transactionManager != null, log);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
config.getInt(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager);
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (metadata != null) {
this.metadata = metadata;
Expand Down Expand Up @@ -459,10 +463,30 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
}
}

private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) {
if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
// throw an exception if the user explicitly set an inconsistent value
throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
+ " should be equal to or larger than " + ProducerConfig.LINGER_MS_CONFIG
+ " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
} else {
// override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility
deliveryTimeoutMs = lingerMs + requestTimeoutMs;
log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.",
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
}
}
return deliveryTimeoutMs;
}

private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;

boolean userConfiguredIdempotence = false;
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
userConfiguredIdempotence = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ public class ProducerConfig extends AbstractConfig {
+ "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "
+ "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";

/** <code>request.timeout.ms</code> */
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
+ " This should be larger than replica.lag.time.max.ms (a broker configuration)"
+ " to reduce the possibility of message duplication due to unnecessary producer retries.";

/** <code>delivery.timeout.ms</code> */
public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure after Producer.send() returns. "
+ "Producer may report failure to send a message earlier than this config if all the retries are exhausted or "
+ "a record is added to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be equal to or "
+ "greater than " + REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG;

/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;

Expand Down Expand Up @@ -188,12 +201,6 @@ public class ProducerConfig extends AbstractConfig {
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";

/** <code>request.timeout.ms</code> */
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
+ " This should be larger than replica.lag.time.max.ms (a broker configuration)"
+ " to reduce the possibility of message duplication due to unnecessary producer retries.";

/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
Expand Down Expand Up @@ -224,7 +231,7 @@ public class ProducerConfig extends AbstractConfig {
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
Expand All @@ -233,7 +240,8 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
.define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
Expand Down Expand Up @@ -77,13 +76,13 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
private boolean retry;
private boolean reopened = false;

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
this(tp, recordsBuilder, now, false);
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) {
this(tp, recordsBuilder, createdMs, false);
}

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) {
this.createdMs = now;
this.lastAttemptMs = now;
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
this.createdMs = createdMs;
this.lastAttemptMs = createdMs;
this.recordsBuilder = recordsBuilder;
this.topicPartition = tp;
this.lastAppendTime = createdMs;
Expand Down Expand Up @@ -158,34 +157,52 @@ public void abort(RuntimeException exception) {
}

/**
* Complete the request. If the batch was previously aborted, this is a no-op.
* Finalize the state of a batch. Final state, once set, is immutable. This function may be called
* once or twice on a batch. It may be called twice if
* 1. An inflight batch expires before a response from the broker is received. The batch's final
* state is set to FAILED. But it could succeed on the broker and second time around batch.done() may
* try to set SUCCEEDED final state.
* 2. If a transaction abortion happens or if the producer is closed forcefully, the final state is
* ABORTED but again it could succeed if broker responds with a success.
*
* Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
* Attempted transitions from one failure state to the same or a different failed state are ignored.
* Attempted transitions from SUCCEEDED to the same or a failed state throw an exception.
*
* @param baseOffset The base offset of the messages assigned by the server
* @param logAppendTime The log append time or -1 if CreateTime is being used
* @param exception The exception that occurred (or null if the request was successful)
* @return true if the batch was completed successfully and false if the batch was previously aborted
*/
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
final FinalState finalState;
if (exception == null) {
final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;

if (tryFinalState == FinalState.SUCCEEDED) {
log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
finalState = FinalState.SUCCEEDED;
} else {
log.trace("Failed to produce messages to {}.", topicPartition, exception);
finalState = FinalState.FAILED;
log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
}

if (this.finalState.compareAndSet(null, tryFinalState)) {
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
return true;
}

if (!this.finalState.compareAndSet(null, finalState)) {
if (this.finalState.get() == FinalState.ABORTED) {
log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
return false;
if (this.finalState.get() != FinalState.SUCCEEDED) {
if (tryFinalState == FinalState.SUCCEEDED) {
// Log if a previously unsuccessful batch succeeded later on.
log.debug("ProduceResponse returned {} for {} after batch with base offset {} had already been {}.",
tryFinalState, topicPartition, baseOffset, this.finalState.get());
} else {
throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get());
// FAILED --> FAILED and ABORTED --> FAILED transitions are ignored.
log.debug("Ignored state transition {} -> {} for {} batch with base offset {}",
this.finalState.get(), tryFinalState, topicPartition, baseOffset);
}
} else {
// A SUCCESSFUL batch must not attempt another state change.
throw new IllegalStateException("A " + this.finalState.get() + " batch must not attempt another state change to " + tryFinalState);
}

completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
return true;
return false;
}

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
Expand Down Expand Up @@ -299,37 +316,12 @@ public String toString() {
return "ProducerBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
}

/**
* A batch whose metadata is not available should be expired if one of the following is true:
* <ol>
* <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached).
* <li> the batch is in retry AND request timeout has elapsed after the backoff period ended.
* </ol>
* This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out.
*/
boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";

boolean expired = expiryErrorMessage != null;
if (expired)
abortRecordAppends();
return expired;
boolean hasReachedDeliveryTimeout(long deliveryTimeoutMs, long now) {
return deliveryTimeoutMs <= now - this.createdMs;
}

/**
* If {@link #maybeExpire(int, long, long, long, boolean)} returned true, the sender will fail the batch with
* the exception returned by this method.
* @return An exception indicating the batch expired.
*/
TimeoutException timeoutException() {
if (expiryErrorMessage == null)
throw new IllegalStateException("Batch has not expired");
return new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage);
public FinalState finalState() {
return this.finalState.get();
}

int attempts() {
Expand All @@ -347,10 +339,6 @@ long queueTimeMs() {
return drainedMs - createdMs;
}

long createdTimeMs(long nowMs) {
return Math.max(0, nowMs - createdMs);
}

long waitedTimeMs(long nowMs) {
return Math.max(0, nowMs - lastAttemptMs);
}
Expand Down Expand Up @@ -467,5 +455,4 @@ public boolean isTransactional() {
public boolean sequenceHasBeenReset() {
return reopened;
}

}
Loading

0 comments on commit 7fc7136

Please sign in to comment.