Skip to content

Commit

Permalink
YARN-3800. Reduce storage footprint for ReservationAllocation. Contri…
Browse files Browse the repository at this point in the history
…buted by Anubhav Dhoot.
  • Loading branch information
carlo curino authored and carlo curino committed Jul 9, 2015
1 parent f4ca530 commit 0e602fa
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 123 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ Release 2.8.0 - UNRELEASED
YARN-3827. Migrate YARN native build to new CMake framework (Alan Burlison
via Colin P. McCabe)

YARN-3800. Reduce storage footprint for ReservationAllocation. (Anubhav Dhoot
via curino)

OPTIMIZATIONS

YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private boolean computeAllocation(ReservationId reservationId, String user,
long curDeadline = deadline;
long oldDeadline = -1;

Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
Map<ReservationInterval, Resource> allocations =
new HashMap<ReservationInterval, Resource>();
RLESparseResourceAllocation tempAssigned =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
plan.getMinimumAllocation());
Expand All @@ -108,6 +108,8 @@ private boolean computeAllocation(ReservationId reservationId, String user,
ReservationRequestInterpreter type = contract.getReservationRequests()
.getInterpreter();

boolean hasGang = false;

// Iterate the stages in backward from deadline
for (ListIterator<ReservationRequest> li =
stages.listIterator(stages.size()); li.hasPrevious();) {
Expand All @@ -117,8 +119,10 @@ private boolean computeAllocation(ReservationId reservationId, String user,
// validate the RR respect basic constraints
validateInput(plan, currentReservationStage, totalCapacity);

hasGang |= currentReservationStage.getConcurrency() > 1;

// run allocation for a single stage
Map<ReservationInterval, ReservationRequest> curAlloc =
Map<ReservationInterval, Resource> curAlloc =
placeSingleStage(plan, tempAssigned, currentReservationStage,
earliestStart, curDeadline, oldReservation, totalCapacity);

Expand Down Expand Up @@ -178,8 +182,7 @@ private boolean computeAllocation(ReservationId reservationId, String user,

// create reservation with above allocations if not null/empty

ReservationRequest ZERO_RES =
ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
Resource ZERO_RES = Resource.newInstance(0, 0);

long firstStartTime = findEarliestTime(allocations.keySet());

Expand All @@ -200,7 +203,7 @@ private boolean computeAllocation(ReservationId reservationId, String user,
new InMemoryReservationAllocation(reservationId, contract, user,
plan.getQueueName(), firstStartTime,
findLatestTime(allocations.keySet()), allocations,
plan.getResourceCalculator(), plan.getMinimumAllocation());
plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
Expand Down Expand Up @@ -242,13 +245,13 @@ private void validateInput(Plan plan, ReservationRequest rr,
* previous instant in time until the time-window is exhausted or we placed
* all the user request.
*/
private Map<ReservationInterval, ReservationRequest> placeSingleStage(
private Map<ReservationInterval, Resource> placeSingleStage(
Plan plan, RLESparseResourceAllocation tempAssigned,
ReservationRequest rr, long earliestStart, long curDeadline,
ReservationAllocation oldResAllocation, final Resource totalCapacity) {

Map<ReservationInterval, ReservationRequest> allocationRequests =
new HashMap<ReservationInterval, ReservationRequest>();
Map<ReservationInterval, Resource> allocationRequests =
new HashMap<ReservationInterval, Resource>();

// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
Expand Down Expand Up @@ -322,7 +325,7 @@ private Map<ReservationInterval, ReservationRequest> placeSingleStage(

ReservationInterval reservationInt =
new ReservationInterval(curDeadline - dur, curDeadline);
ReservationRequest reservationRes =
ReservationRequest reservationRequest =
ReservationRequest.newInstance(rr.getCapability(),
rr.getConcurrency() * maxGang, rr.getConcurrency(),
rr.getDuration());
Expand All @@ -331,6 +334,8 @@ private Map<ReservationInterval, ReservationRequest> placeSingleStage(
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
final Resource reservationRes = ReservationSystemUtil.toResource(
reservationRequest);
tempAssigned.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);

Expand All @@ -350,7 +355,7 @@ private Map<ReservationInterval, ReservationRequest> placeSingleStage(
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
for (Map.Entry<ReservationInterval, Resource> tempAllocation :
allocationRequests.entrySet()) {
tempAssigned.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
Expand Down Expand Up @@ -110,7 +109,7 @@ public QueueMetrics getQueueMetrics() {

private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, ReservationRequest> allocationRequests =
Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
Expand All @@ -119,7 +118,7 @@ private void incrementAllocation(ReservationAllocation reservation) {
resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
userResourceAlloc.put(user, resAlloc);
}
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.addInterval(r.getKey(), r.getValue());
rleSparseVector.addInterval(r.getKey(), r.getValue());
Expand All @@ -128,11 +127,11 @@ private void incrementAllocation(ReservationAllocation reservation) {

private void decrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, ReservationRequest> allocationRequests =
Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.removeInterval(r.getKey(), r.getValue());
rleSparseVector.removeInterval(r.getKey(), r.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand All @@ -40,7 +39,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
private final ReservationDefinition contract;
private final long startTime;
private final long endTime;
private final Map<ReservationInterval, ReservationRequest> allocationRequests;
private final Map<ReservationInterval, Resource> allocationRequests;
private boolean hasGang = false;
private long acceptedAt = -1;

Expand All @@ -49,22 +48,29 @@ class InMemoryReservationAllocation implements ReservationAllocation {
InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, ReservationRequest> allocationRequests,
Map<ReservationInterval, Resource> allocations,
ResourceCalculator calculator, Resource minAlloc) {
this(reservationID, contract, user, planName, startTime, endTime,
allocations, calculator, minAlloc, false);
}

InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, Resource> allocations,
ResourceCalculator calculator, Resource minAlloc, boolean hasGang) {
this.contract = contract;
this.startTime = startTime;
this.endTime = endTime;
this.reservationID = reservationID;
this.user = user;
this.allocationRequests = allocationRequests;
this.allocationRequests = allocations;
this.planName = planName;
this.hasGang = hasGang;
resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
for (Map.Entry<ReservationInterval, Resource> r : allocations
.entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue());
if (r.getValue().getConcurrency() > 1) {
hasGang = true;
}
}
}

Expand All @@ -89,7 +95,7 @@ public long getEndTime() {
}

@Override
public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
public Map<ReservationInterval, Resource> getAllocationRequests() {
return Collections.unmodifiableMap(allocationRequests);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
Expand All @@ -31,9 +30,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

Expand Down Expand Up @@ -80,14 +77,11 @@ private boolean isSameAsNext(Long key, Resource capacity) {
*
* @param reservationInterval the interval for which the resource is to be
* added
* @param capacity the resource to be added
* @param totCap the resource to be added
* @return true if addition is successful, false otherwise
*/
public boolean addInterval(ReservationInterval reservationInterval,
ReservationRequest capacity) {
Resource totCap =
Resources.multiply(capacity.getCapability(),
(float) capacity.getNumContainers());
Resource totCap) {
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
Expand Down Expand Up @@ -142,45 +136,16 @@ public boolean addInterval(ReservationInterval reservationInterval,
}
}

/**
* Add multiple resources for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* added
* @param ReservationRequests the resources to be added
* @param clusterResource the total resources in the cluster
* @return true if addition is successful, false otherwise
*/
public boolean addCompositeInterval(ReservationInterval reservationInterval,
List<ReservationRequest> ReservationRequests, Resource clusterResource) {
ReservationRequest aggregateReservationRequest =
Records.newRecord(ReservationRequest.class);
Resource capacity = Resource.newInstance(0, 0);
for (ReservationRequest ReservationRequest : ReservationRequests) {
Resources.addTo(capacity, Resources.multiply(
ReservationRequest.getCapability(),
ReservationRequest.getNumContainers()));
}
aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
.divide(resourceCalculator, clusterResource, capacity, minAlloc)));
aggregateReservationRequest.setCapability(minAlloc);

return addInterval(reservationInterval, aggregateReservationRequest);
}

/**
* Removes a resource for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* removed
* @param capacity the resource to be removed
* @param totCap the resource to be removed
* @return true if removal is successful, false otherwise
*/
public boolean removeInterval(ReservationInterval reservationInterval,
ReservationRequest capacity) {
Resource totCap =
Resources.multiply(capacity.getCapability(),
(float) capacity.getNumContainers());
Resource totCap) {
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;

/**
Expand Down Expand Up @@ -71,7 +70,7 @@ public interface ReservationAllocation extends
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
*/
public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
public Map<ReservationInterval, Resource> getAllocationRequests();

/**
* Return a string identifying the plan to which the reservation belongs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.HashMap;
import java.util.Map;

final class ReservationSystemUtil {

private ReservationSystemUtil() {
// not called
}

public static Resource toResource(ReservationRequest request) {
Resource resource = Resources.multiply(request.getCapability(),
(float) request.getNumContainers());
return resource;
}

public static Map<ReservationInterval, Resource> toResources(
Map<ReservationInterval, ReservationRequest> allocations) {
Map<ReservationInterval, Resource> resources =
new HashMap<ReservationInterval, Resource>();
for (Map.Entry<ReservationInterval, ReservationRequest> entry :
allocations.entrySet()) {
resources.put(entry.getKey(),
toResource(entry.getValue()));
}
return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,15 @@ public static ReservationDefinition generateBigRR(Random rand, long i) {
return rr;
}

public static Map<ReservationInterval, ReservationRequest> generateAllocation(
public static Map<ReservationInterval, Resource> generateAllocation(
long startTime, long step, int[] alloc) {
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
Map<ReservationInterval, Resource> req =
new TreeMap<ReservationInterval, Resource>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
* step), ReservationRequest.newInstance(
Resource.newInstance(1024, 1), alloc[i]));
* step), ReservationSystemUtil.toResource(ReservationRequest
.newInstance(
Resource.newInstance(1024, 1), alloc[i])));
}
return req;
}
Expand Down
Loading

0 comments on commit 0e602fa

Please sign in to comment.