Skip to content

Commit

Permalink
KAFKA-14316; Fix feature control iterator metadata version handling (a…
Browse files Browse the repository at this point in the history
…pache#12765)

The iterator `FeatureControlIterator.hasNext()` checks two conditions: 1) whether we have already written the metadata version, and 2) whether the underlying iterator has additional records. However, in `next()`, we also check that the metadata version is at least high enough to include it in the log. When this fails, then we can see an unexpected `NoSuchElementException` if the underlying iterator is empty.

Reviewers: Colin Patrick McCabe <[email protected]>
  • Loading branch information
hachikuji authored Oct 18, 2022
1 parent 7dc1790 commit a692223
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,22 +320,25 @@ class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> {

@Override
public boolean hasNext() {
return !wroteVersion || iterator.hasNext();
return needsWriteMetadataVersion() || iterator.hasNext();
}

private boolean needsWriteMetadataVersion() {
return !wroteVersion && metadataVersion.isAtLeast(minimumBootstrapVersion);
}

@Override
public List<ApiMessageAndVersion> next() {
// Write the metadata.version first
if (!wroteVersion) {
if (metadataVersion.isAtLeast(minimumBootstrapVersion)) {
wroteVersion = true;
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
}
if (needsWriteMetadataVersion()) {
wroteVersion = true;
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
}

// Then write the rest of the features
if (!hasNext()) throw new NoSuchElementException();
if (!iterator.hasNext()) throw new NoSuchElementException();
Entry<String, Short> entry = iterator.next();
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
.setName(entry.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,24 @@ public void testUpdateFeaturesErrorCases() {
);
}

@Test
public void testFeatureControlIteratorWithOldMetadataVersion() throws Exception {
// We require minimum of IBP_3_3_IV0 to write metadata version in the snapshot.

LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager.Builder()
.setLogContext(logContext)
.setSnapshotRegistry(snapshotRegistry)
.setMetadataVersion(MetadataVersion.IBP_3_2_IV0)
.build();

RecordTestUtils.assertBatchIteratorContains(
Collections.emptyList(),
manager.iterator(Long.MAX_VALUE)
);
}

@Test
public void testFeatureControlIterator() throws Exception {
LogContext logContext = new LogContext();
Expand Down

0 comments on commit a692223

Please sign in to comment.