Skip to content

Commit

Permalink
MINOR: Update consumer javadoc for invalid operations on unassigned p…
Browse files Browse the repository at this point in the history
…artitions (apache#5005)

Document cases where  `IllegalStateException` is raised when attempting an invalid operation on an unassigned partition. Also change `position()` to raise `IllegalStateException` when called on an unassigned partition for consistency.
  • Loading branch information
omkreddy authored and hachikuji committed May 28, 2018
1 parent b751321 commit 8bf20bb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1401,8 +1401,8 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
*
* @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
* or if provided offset is negative
* @throws IllegalArgumentException if the provided offset is negative
* @throws IllegalStateException if the provided TopicPartition is not assigned to this consumer
*/
@Override
public void seek(TopicPartition partition, long offset) {
Expand All @@ -1423,7 +1423,8 @@ public void seek(TopicPartition partition, long offset) {
* first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
* If no partitions are provided, seek to the first offset for all of the currently assigned partitions.
*
* @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
public void seekToBeginning(Collection<TopicPartition> partitions) {
if (partitions == null)
Expand All @@ -1449,7 +1450,8 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
* If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
* of the first message with an open transaction.
*
* @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
public void seekToEnd(Collection<TopicPartition> partitions) {
if (partitions == null)
Expand All @@ -1476,7 +1478,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
*
* @param partition The partition to get the position for
* @return The current position of the consumer (that is, the offset of the next record to be fetched)
* @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
* @throws IllegalStateException if the provided TopicPartition is not assigned to this consumer
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
* the partition
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
Expand All @@ -1492,7 +1494,7 @@ public long position(TopicPartition partition) {
acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
while (offset == null) {
// batch update fetch positions for any partitions without a valid position
Expand Down Expand Up @@ -1614,7 +1616,7 @@ public Map<String, List<PartitionInfo>> listTopics() {
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
* @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void pause(Collection<TopicPartition> partitions) {
Expand All @@ -1634,7 +1636,7 @@ public void pause(Collection<TopicPartition> partitions) {
* {@link #poll(Duration)} will return records from these partitions if there are any to be fetched.
* If the partitions were not previously paused, this method is a no-op.
* @param partitions The partitions which should be resumed
* @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void resume(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertNull(this.consumers.head.committed(new TopicPartition(topic, 15)))

// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalArgumentException] {
intercept[IllegalStateException] {
this.consumers.head.position(new TopicPartition(topic, 15))
}

Expand Down

0 comments on commit 8bf20bb

Please sign in to comment.