Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #29. Support specifying node labels for the DataNode and NameNode containers. #33

Merged
merged 2 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ class AMOptions {
public static final String NAMENODE_MEMORY_MB_DEFAULT = "2048";
public static final String NAMENODE_VCORES_ARG = "namenode_vcores";
public static final String NAMENODE_VCORES_DEFAULT = "1";
public static final String NAMENODE_NODELABEL_ARG = "namenode_nodelabel";
public static final String NAMENODE_ARGS_ARG = "namenode_args";
public static final String DATANODE_MEMORY_MB_ARG = "datanode_memory_mb";
public static final String DATANODE_MEMORY_MB_DEFAULT = "2048";
public static final String DATANODE_VCORES_ARG = "datanode_vcores";
public static final String DATANODE_VCORES_DEFAULT = "1";
public static final String DATANODE_NODELABEL_ARG = "datanode_nodelabel";
public static final String DATANODE_ARGS_ARG = "datanode_args";
public static final String NAMENODE_METRICS_PERIOD_ARG = "namenode_metrics_period";
public static final String NAMENODE_METRICS_PERIOD_DEFAULT = "60";
Expand All @@ -41,28 +43,33 @@ class AMOptions {
private final int datanodeMemoryMB;
private final int datanodeVirtualCores;
private final String datanodeArgs;
private final String datanodeNodeLabelExpression;
private final int datanodesPerCluster;
private final String datanodeLaunchDelay;
private final int namenodeMemoryMB;
private final int namenodeVirtualCores;
private final String namenodeArgs;
private final String namenodeNodeLabelExpression;
private final int namenodeMetricsPeriod;
// Original shellEnv as passed in through arguments
private final Map<String, String> originalShellEnv;
// Extended shellEnv including custom environment variables
private final Map<String, String> shellEnv;

AMOptions(int datanodeMemoryMB, int datanodeVirtualCores, String datanodeArgs,
int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB, int namenodeVirtualCores,
String namenodeArgs, int namenodeMetricsPeriod, Map<String, String> shellEnv) {
String datanodeNodeLabelExpression, int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB,
int namenodeVirtualCores, String namenodeArgs, String namenodeNodeLabelExpression, int namenodeMetricsPeriod,
Map<String, String> shellEnv) {
this.datanodeMemoryMB = datanodeMemoryMB;
this.datanodeVirtualCores = datanodeVirtualCores;
this.datanodeArgs = datanodeArgs;
this.datanodeNodeLabelExpression = datanodeNodeLabelExpression;
this.datanodesPerCluster = datanodesPerCluster;
this.datanodeLaunchDelay = datanodeLaunchDelay;
this.namenodeMemoryMB = namenodeMemoryMB;
this.namenodeVirtualCores = namenodeVirtualCores;
this.namenodeArgs = namenodeArgs;
this.namenodeNodeLabelExpression = namenodeNodeLabelExpression;
this.namenodeMetricsPeriod = namenodeMetricsPeriod;
this.originalShellEnv = shellEnv;
this.shellEnv = new HashMap<>(this.originalShellEnv);
Expand Down Expand Up @@ -100,13 +107,19 @@ void addToVargs(List<String> vargs) {
if (!datanodeArgs.isEmpty()) {
vargs.add("--" + DATANODE_ARGS_ARG + " \\\"" + datanodeArgs + "\\\"");
}
if (!datanodeNodeLabelExpression.isEmpty()) {
vargs.add("--" + DATANODE_NODELABEL_ARG + " \\\"" + datanodeNodeLabelExpression + "\\\"");
}
vargs.add("--" + DATANODES_PER_CLUSTER_ARG + " " + String.valueOf(datanodesPerCluster));
vargs.add("--" + DATANODE_LAUNCH_DELAY_ARG + " " + datanodeLaunchDelay);
vargs.add("--" + NAMENODE_MEMORY_MB_ARG + " " + String.valueOf(namenodeMemoryMB));
vargs.add("--" + NAMENODE_VCORES_ARG + " " + String.valueOf(namenodeVirtualCores));
if (!namenodeArgs.isEmpty()) {
vargs.add("--" + NAMENODE_ARGS_ARG + " \\\"" + namenodeArgs + "\\\"");
}
if (!namenodeNodeLabelExpression.isEmpty()) {
vargs.add("--" + NAMENODE_NODELABEL_ARG + " \\\"" + namenodeNodeLabelExpression + "\\\"");
}
vargs.add("--" + NAMENODE_METRICS_PERIOD_ARG + " " + String.valueOf(namenodeMetricsPeriod));
for (Map.Entry<String, String> entry : originalShellEnv.entrySet()) {
vargs.add("--" + SHELL_ENV_ARG + " " + entry.getKey() + "=" + entry.getValue());
Expand All @@ -121,6 +134,10 @@ int getDataNodeVirtualCores() {
return datanodeVirtualCores;
}

String getDataNodeNodeLabelExpression() {
return datanodeNodeLabelExpression;
}

int getDataNodesPerCluster() {
return datanodesPerCluster;
}
Expand All @@ -141,6 +158,10 @@ int getNameNodeVirtualCores() {
return namenodeVirtualCores;
}

String getNameNodeNodeLabelExpression() {
return namenodeNodeLabelExpression;
}

Map<String, String> getShellEnv() {
return shellEnv;
}
Expand All @@ -160,6 +181,8 @@ static void setOptions(Options opts) {
"Ignored unless the NameNode is run within YARN.");
opts.addOption(NAMENODE_ARGS_ARG, true,
"Additional arguments to add when starting the NameNode. Ignored unless the NameNode is run within YARN.");
opts.addOption(NAMENODE_NODELABEL_ARG, true,
"The node label to specify for the container to use to run the NameNode.");
opts.addOption(NAMENODE_METRICS_PERIOD_ARG, true,
"The period in seconds for the NameNode's metrics to be emitted to file; if <=0, " +
"disables this functionality. Otherwise, a metrics file will be stored in the " +
Expand All @@ -169,6 +192,8 @@ static void setOptions(Options opts) {
opts.addOption(DATANODE_VCORES_ARG, true,
"Amount of virtual cores to be requested to run the DNs (default " + DATANODE_VCORES_DEFAULT + ")");
opts.addOption(DATANODE_ARGS_ARG, true, "Additional arguments to add when starting the DataNodes.");
opts.addOption(DATANODE_NODELABEL_ARG, true,
"The node label to specify for the container to use to run the DataNode.");
opts.addOption(DATANODES_PER_CLUSTER_ARG, true, "How many simulated DataNodes to run within each YARN container " +
"(default " + DATANODES_PER_CLUSTER_DEFAULT + ")");
opts.addOption(DATANODE_LAUNCH_DELAY_ARG, true, "The period over which to launch the DataNodes; this will " +
Expand Down Expand Up @@ -207,11 +232,13 @@ static AMOptions initFromParser(CommandLine cliParser) {
Integer.parseInt(cliParser.getOptionValue(DATANODE_MEMORY_MB_ARG, DATANODE_MEMORY_MB_DEFAULT)),
Integer.parseInt(cliParser.getOptionValue(DATANODE_VCORES_ARG, DATANODE_VCORES_DEFAULT)),
cliParser.getOptionValue(DATANODE_ARGS_ARG, ""),
cliParser.getOptionValue(DATANODE_NODELABEL_ARG, ""),
Integer.parseInt(cliParser.getOptionValue(DATANODES_PER_CLUSTER_ARG, DATANODES_PER_CLUSTER_DEFAULT)),
cliParser.getOptionValue(DATANODE_LAUNCH_DELAY_ARG, DATANODE_LAUNCH_DELAY_DEFAULT),
Integer.parseInt(cliParser.getOptionValue(NAMENODE_MEMORY_MB_ARG, NAMENODE_MEMORY_MB_DEFAULT)),
Integer.parseInt(cliParser.getOptionValue(NAMENODE_VCORES_ARG, NAMENODE_VCORES_DEFAULT)),
cliParser.getOptionValue(NAMENODE_ARGS_ARG, ""),
cliParser.getOptionValue(NAMENODE_NODELABEL_ARG, ""),
Integer.parseInt(cliParser.getOptionValue(NAMENODE_METRICS_PERIOD_ARG, NAMENODE_METRICS_PERIOD_DEFAULT)),
originalShellEnv);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ public Boolean get() {

Optional<Properties> namenodeProperties = Optional.absent();
if (launchNameNode) {
ContainerRequest nnContainerRequest = setupContainerAskForRM(
amOptions.getNameNodeMemoryMB(), amOptions.getNameNodeVirtualCores(), 0);
ContainerRequest nnContainerRequest = setupContainerAskForRM(amOptions.getNameNodeMemoryMB(),
amOptions.getNameNodeVirtualCores(), 0, amOptions.getNameNodeNodeLabelExpression());
LOG.info("Requested NameNode ask: " + nnContainerRequest.toString());
amRMClient.addContainerRequest(nnContainerRequest);

Expand Down Expand Up @@ -333,7 +333,7 @@ public Boolean get() {
amOptions.getDataNodeVirtualCores() + " vcores, ");
for (int i = 0; i < numTotalDataNodeContainers; ++i) {
ContainerRequest datanodeAsk = setupContainerAskForRM(amOptions.getDataNodeMemoryMB(),
amOptions.getDataNodeVirtualCores(), 1);
amOptions.getDataNodeVirtualCores(), 1, amOptions.getDataNodeNodeLabelExpression());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior if this is empty ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is "", so it uses the default (unlabeled) node label.

amRMClient.addContainerRequest(datanodeAsk);
LOG.debug("Requested datanode ask: " + datanodeAsk.toString());
}
Expand Down Expand Up @@ -789,7 +789,7 @@ private boolean isDataNode(ContainerId containerId) {
*
* @return the setup ResourceRequest to be sent to RM
*/
private ContainerRequest setupContainerAskForRM(int memory, int vcores, int priority) {
private ContainerRequest setupContainerAskForRM(int memory, int vcores, int priority, String nodeLabel) {
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(priority);

Expand All @@ -799,7 +799,7 @@ private ContainerRequest setupContainerAskForRM(int memory, int vcores, int prio
capability.setMemory(memory);
capability.setVirtualCores(vcores);

return new ContainerRequest(capability, null, null, pri);
return new ContainerRequest(capability, null, null, pri, true, nodeLabel);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import java.io.File;
Expand All @@ -14,8 +15,11 @@
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
Expand All @@ -38,10 +42,18 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -81,7 +93,7 @@ public class TestDynamometerInfra {

private static final Log LOG = LogFactory.getLog(TestDynamometerInfra.class);

private static final int MINICLUSTER_NUM_NMS = 1;
private static final int MINICLUSTER_NUM_NMS = 3;
private static final int MINICLUSTER_NUM_DNS = 1;

private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path";
Expand All @@ -90,6 +102,9 @@ public class TestDynamometerInfra {
private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740";
private static final String VERSION_FILENAME = "VERSION";

private static final String NAMENODE_NODELABEL = "dyno_namenode";
private static final String DATANODE_NODELABEL = "dyno_datanode";

private static MiniDFSCluster miniDFSCluster;
private static MiniYARNCluster miniYARNCluster;
private static YarnClient yarnClient;
Expand Down Expand Up @@ -144,12 +159,23 @@ public static void setupClass() throws Exception {
fail("Unable to execute tar to expand Hadoop binary");
}

conf.setBoolean("yarn.minicluster.fixed.ports", true);
conf.setBoolean("yarn.minicluster.use-rpc", true);
conf.setInt("yarn.scheduler.minimum-allocation-mb", 128);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
for (String q : new String[] { "root", "root.default" } ) {
conf.setInt(CapacitySchedulerConfiguration.PREFIX + q + "." + CapacitySchedulerConfiguration.CAPACITY, 100);
String accessibleNodeLabelPrefix = CapacitySchedulerConfiguration.PREFIX + q + "." +
CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS;
conf.set(accessibleNodeLabelPrefix, CapacitySchedulerConfiguration.ALL_ACL);
conf.setInt(
accessibleNodeLabelPrefix + "." + DATANODE_NODELABEL + "." + CapacitySchedulerConfiguration.CAPACITY, 100);
conf.setInt(
accessibleNodeLabelPrefix + "." + NAMENODE_NODELABEL + "." + CapacitySchedulerConfiguration.CAPACITY, 100);
}
// This is necessary to have the RM respect our vcore allocation request
conf.set("yarn.scheduler.capacity.resource-calculator",
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
conf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
miniYARNCluster = new MiniYARNCluster(
TestDynamometerInfra.class.getName(), 1, MINICLUSTER_NUM_NMS, 1, 1);
miniYARNCluster.init(conf);
Expand All @@ -176,6 +202,13 @@ public static void setupClass() throws Exception {
uploadFsimageResourcesToHDFS(hadoopBinVersion);

miniYARNCluster.waitForNodeManagersToConnect(30000);

RMNodeLabelsManager nodeLabelManager = miniYARNCluster.getResourceManager().getRMContext().getNodeLabelManager();
nodeLabelManager.addToCluserNodeLabels(Sets.newHashSet(NAMENODE_NODELABEL, DATANODE_NODELABEL));
Map<NodeId, Set<String>> nodeLabels = new HashMap<>();
nodeLabels.put(miniYARNCluster.getNodeManager(0).getNMContext().getNodeId(), Sets.newHashSet(NAMENODE_NODELABEL));
nodeLabels.put(miniYARNCluster.getNodeManager(1).getNMContext().getNodeId(), Sets.newHashSet(DATANODE_NODELABEL));
nodeLabelManager.addLabelsToNode(nodeLabels);
}

@AfterClass
Expand Down Expand Up @@ -228,8 +261,10 @@ public void run() {
"-" + Client.HADOOP_BINARY_PATH_ARG, hadoopTarballPath.getAbsolutePath(),
"-" + AMOptions.DATANODES_PER_CLUSTER_ARG, "2",
"-" + AMOptions.DATANODE_MEMORY_MB_ARG, "128",
"-" + AMOptions.DATANODE_NODELABEL_ARG, DATANODE_NODELABEL,
"-" + AMOptions.NAMENODE_MEMORY_MB_ARG, "256",
"-" + AMOptions.NAMENODE_METRICS_PERIOD_ARG, "1",
"-" + AMOptions.NAMENODE_NODELABEL_ARG, NAMENODE_NODELABEL,
"-" + AMOptions.SHELL_ENV_ARG, "HADOOP_HOME=" + getHadoopHomeLocation(),
"-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop",
"-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
Expand Down Expand Up @@ -296,6 +331,15 @@ public Boolean get() {
throw e;
}

Map<ContainerId, Container> namenodeContainers = miniYARNCluster.getNodeManager(0).getNMContext().getContainers();
Map<ContainerId, Container> datanodeContainers = miniYARNCluster.getNodeManager(1).getNMContext().getContainers();
Map<ContainerId, Container> amContainers = miniYARNCluster.getNodeManager(2).getNMContext().getContainers();
assertEquals(1, namenodeContainers.size());
assertEquals(2, namenodeContainers.keySet().iterator().next().getContainerId());
assertEquals(2, datanodeContainers.size());
assertEquals(1, amContainers.size());
assertEquals(1, amContainers.keySet().iterator().next().getContainerId());

LOG.info("Waiting for workload job to start and complete");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
Expand Down