Skip to content

Commit

Permalink
KAFKA-6897; Prevent KafkaProducer.send from blocking when producer is…
Browse files Browse the repository at this point in the history
… closed (apache#5027)

After successful completion of KafkaProducer#close, it is possible that an application calls KafkaProducer#send. If the send is invoked for a topic for which we do not have any metadata, the producer will block until `max.block.ms` elapses - we do not expect to receive any metadata update in this case because Sender (and NetworkClient) has already exited. It is only when RecordAccumulator#append is invoked that we notice that the producer has already been closed and throw an exception. If `max.block.ms` is set to Long.MaxValue (or a sufficiently high value in general), the producer could block awaiting metadata indefinitely.

This patch makes sure `Metadata#awaitUpdate` periodically checks if the network client has been closed, and if so bails out as soon as possible.
  • Loading branch information
dhruvilshah3 authored and hachikuji committed Jul 21, 2018
1 parent 591954e commit d11f6f2
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long no
public void requestUpdate() {
// Do nothing
}

@Override
public void close() {
}
}
35 changes: 31 additions & 4 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
Expand All @@ -25,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -48,7 +50,7 @@
* is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
* manage topics while producers rely on topic expiry to limit the refresh set.
*/
public final class Metadata {
public final class Metadata implements Closeable {

private static final Logger log = LoggerFactory.getLogger(Metadata.class);

Expand All @@ -70,6 +72,7 @@ public final class Metadata {
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
private final boolean topicExpiryEnabled;
private boolean isClosed;

public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
Expand Down Expand Up @@ -100,6 +103,7 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoT
this.listeners = new ArrayList<>();
this.clusterResourceListeners = clusterResourceListeners;
this.needMetadataForAllTopics = false;
this.isClosed = false;
}

/**
Expand Down Expand Up @@ -164,12 +168,12 @@ public synchronized AuthenticationException getAndClearAuthenticationException()
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
if (maxWaitMs < 0)
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");
}

long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
while ((this.version <= lastVersion) && !isClosed()) {
AuthenticationException ex = getAndClearAuthenticationException();
if (ex != null)
throw ex;
Expand All @@ -180,6 +184,8 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
if (isClosed())
throw new KafkaException("Requested metadata update after close");
}

/**
Expand Down Expand Up @@ -224,6 +230,8 @@ public synchronized boolean containsTopic(String topic) {
*/
public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(newCluster, "cluster should not be null");
if (isClosed())
throw new IllegalStateException("Update requested after metadata close");

this.needUpdate = false;
this.lastRefreshMs = now;
Expand Down Expand Up @@ -331,6 +339,25 @@ public synchronized void removeListener(Listener listener) {
this.listeners.remove(listener);
}

/**
* "Close" this metadata instance to indicate that metadata updates are no longer possible. This is typically used
* when the thread responsible for performing metadata updates is exiting and needs a way to relay this information
* to any other thread(s) that could potentially wait on metadata update to come through.
*/
@Override
public synchronized void close() {
this.isClosed = true;
this.notifyAll();
}

/**
* Check if this metadata instance has been closed. See {@link #close()} for more information.
* @return True if this instance has been closed; false otherwise
*/
public synchronized boolean isClosed() {
return this.isClosed;
}

/**
* MetadataUpdate Listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;

import java.io.Closeable;
import java.util.List;

/**
Expand All @@ -29,7 +30,7 @@
* <p>
* This class is not thread-safe!
*/
public interface MetadataUpdater {
public interface MetadataUpdater extends Closeable {

/**
* Gets the current cluster info without blocking.
Expand Down Expand Up @@ -82,4 +83,10 @@ public interface MetadataUpdater {
* start of the update if possible (see `maybeUpdate` for more information).
*/
void requestUpdate();

/**
* Close this updater.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ public void wakeup() {
@Override
public void close() {
this.selector.close();
this.metadataUpdater.close();
}

/**
Expand Down Expand Up @@ -981,6 +982,11 @@ public void requestUpdate() {
this.metadata.requestUpdate();
}

@Override
public void close() {
this.metadata.close();
}

/**
* Return true if there's at least one connection establishment is currently underway
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long no
public void requestUpdate() {
AdminMetadataManager.this.requestUpdate();
}

@Override
public void close() {
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException fatal error indicating that the producer is not allowed to write
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
* when send is invoked after producer has been closed.
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>.
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
*
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
Expand All @@ -804,14 +804,29 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
return doSend(interceptedRecord, callback);
}

// Verify that this producer instance has not been closed. This method throws IllegalStateException if the producer
// has already been closed.
private void throwIfProducerClosed() {
if (ioThread == null || !ioThread.isAlive())
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}

/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
Expand Down Expand Up @@ -896,6 +911,7 @@ private void setReadOnly(Headers headers) {
* @param partition A specific partition expected to exist in metadata, or null if there's no preference
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we waited in ms
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Expand Down Expand Up @@ -1016,8 +1032,9 @@ public void flush() {
* Get the partition metadata for the given topic. This can be used for custom partitioning.
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws InterruptException If the thread is interrupted while blocked
* @throws InterruptException if the thread is interrupted while blocked
* @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,6 @@ public void close() {

@Override
public void close(long timeout, TimeUnit timeUnit) {
if (this.closed) {
throw new IllegalStateException("MockProducer is already closed.");
}
this.closed = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -195,7 +196,7 @@ public RecordAppendResult append(TopicPartition tp,
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
Expand All @@ -209,7 +210,7 @@ public RecordAppendResult append(TopicPartition tp,
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
throw new KafkaException("Producer closed while send in progress");

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
Expand Down Expand Up @@ -700,7 +701,7 @@ public void abortIncompleteBatches() {
* Go through incomplete batches and abort them.
*/
private void abortBatches() {
abortBatches(new IllegalStateException("Producer is closed forcefully."));
abortBatches(new KafkaException("Producer is closed forcefully."));
}

/**
Expand Down
40 changes: 34 additions & 6 deletions clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
Expand All @@ -46,7 +47,7 @@ public class MetadataTest {
private long refreshBackoffMs = 100;
private long metadataExpireMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
private AtomicReference<String> backgroundError = new AtomicReference<>();
private AtomicReference<Exception> backgroundError = new AtomicReference<>();

@After
public void tearDown() {
Expand Down Expand Up @@ -83,6 +84,30 @@ public void testMetadata() throws Exception {
assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
}

@Test
public void testMetadataAwaitAfterClose() throws InterruptedException {
long time = 0;
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
metadata.requestUpdate();
assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
time += refreshBackoffMs;
assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
String topic = "my-topic";
metadata.close();
Thread t1 = asyncFetch(topic, 500);
t1.join();
assertTrue(backgroundError.get().getClass() == KafkaException.class);
assertTrue(backgroundError.get().toString().contains("Requested metadata update after close"));
clearBackgroundError();
}

@Test(expected = IllegalStateException.class)
public void testMetadataUpdateAfterClose() {
metadata.close();
metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
}

private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) {
long now = 10000;

Expand Down Expand Up @@ -409,15 +434,18 @@ public void testNonExpiringMetadata() throws Exception {
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
}

private void clearBackgroundError() {
backgroundError.set(null);
}

private Thread asyncFetch(final String topic, final long maxWaitMs) {
Thread thread = new Thread() {
public void run() {
while (metadata.fetch().partitionsForTopic(topic).isEmpty()) {
try {
try {
while (metadata.fetch().partitionsForTopic(topic).isEmpty())
metadata.awaitUpdate(metadata.requestUpdate(), maxWaitMs);
} catch (Exception e) {
backgroundError.set(e.toString());
}
} catch (Exception e) {
backgroundError.set(e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ public ClientRequest newClientRequest(String nodeId,

@Override
public void close() {
metadata.close();
}

@Override
Expand Down
Loading

0 comments on commit d11f6f2

Please sign in to comment.