Skip to content

Commit

Permalink
YARN-8481. AMRMProxyPolicies should accept heartbeat response from ne…
Browse files Browse the repository at this point in the history
…w/unknown subclusters. Contributed by Botong Huang.
  • Loading branch information
gifuma authored and Giovanni Matteo Fumarola committed Jun 29, 2018
1 parent 0317961 commit 9cc0c42
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;

Expand All @@ -40,8 +37,6 @@
*/
public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {

private Set<SubClusterId> knownClusterIds = new HashSet<>();

@Override
public void reinitialize(
FederationPolicyInitializationContext policyContext)
Expand All @@ -65,7 +60,6 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
// simply broadcast the resource request to all sub-clusters
for (SubClusterId subClusterId : activeSubclusters.keySet()) {
answer.put(subClusterId, resourceRequests);
knownClusterIds.add(subClusterId);
}

return answer;
Expand All @@ -74,11 +68,6 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
if (!knownClusterIds.contains(subClusterId)) {
throw new UnknownSubclusterException(
"The response is received from a subcluster that is unknown to this "
+ "policy.");
}
// stateless policy does not care about responses
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
Expand All @@ -38,8 +36,6 @@
*/
public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy {

private Set<SubClusterId> knownClusterIds = new HashSet<>();

@Override
public void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,16 @@ public void testSplitAllocateRequest() throws Exception {
}

@Test
public void testNotifyOfResponse() throws Exception {
public void testNotifyOfResponseFromUnknownSubCluster() throws Exception {
String[] hosts = new String[] {"host1", "host2" };
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);

try {
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
Assert.fail();
} catch (FederationPolicyException f) {
System.out.println("Expected: " + f.getMessage());
}
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));

((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));
Expand Down

0 comments on commit 9cc0c42

Please sign in to comment.