Skip to content

Commit

Permalink
Fix ConcurrentModification exception possible in DataflowExecutionSta…
Browse files Browse the repository at this point in the history
…teSampler (#30993)

Also ensure that the result is not modified by observing it. Previously we
were merging into completed with each observation which appeared unintended.
  • Loading branch information
scwhittle committed Apr 16, 2024
1 parent 277b6c3 commit f6322da
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class DataflowExecutionStateSampler extends ExecutionStateSampler {

private final ConcurrentHashMap<String, DataflowExecutionStateTracker> activeTrackersByWorkId =
new ConcurrentHashMap<>();
// The maps within completeProcessingMetrics should not be modified.
private final ConcurrentHashMap<String, Map<String, IntSummaryStatistics>>
completedProcessingMetrics = new ConcurrentHashMap<>();

Expand All @@ -64,7 +65,7 @@ public void addTracker(ExecutionStateTracker tracker) {
this.activeTrackersByWorkId.put(dfTracker.getWorkItemId(), dfTracker);
}

private static Map<String, IntSummaryStatistics> mergeStepStatsMaps(
private static void mergeStepStatsMaps(
Map<String, IntSummaryStatistics> map1, Map<String, IntSummaryStatistics> map2) {
for (Entry<String, IntSummaryStatistics> steps : map2.entrySet()) {
map1.compute(
Expand All @@ -77,7 +78,6 @@ private static Map<String, IntSummaryStatistics> mergeStepStatsMaps(
return v;
});
}
return map1;
}

@Override
Expand Down Expand Up @@ -118,13 +118,15 @@ public Optional<ActiveMessageMetadata> getActiveMessageMetadataForWorkId(String
}

public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(String workId) {
Map<String, IntSummaryStatistics> result;
DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
if (tracker == null) {
return completedProcessingMetrics.getOrDefault(workId, new HashMap<>());
result = new HashMap<>();
} else {
result = tracker.getProcessingTimesByStepCopy();
}
return mergeStepStatsMaps(
completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
tracker.getProcessingTimesByStepCopy());
mergeStepStatsMaps(result, completedProcessingMetrics.getOrDefault(workId, new HashMap<>()));
return result;
}

public void resetForWorkId(String workId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public void testGetCompletedProcessingTimesAndActiveMessageFromActiveTracker() {
assertThat(sampler.getActiveMessageMetadataForWorkId(workId).get(), equalTo(testMetadata));
assertThat(
sampler.getProcessingDistributionsForWorkId(workId), equalTo(testCompletedProcessingTimes));
// Repeated calls should not modify the result.
assertThat(
sampler.getProcessingDistributionsForWorkId(workId), equalTo(testCompletedProcessingTimes));
}

@Test
Expand Down

0 comments on commit f6322da

Please sign in to comment.