Skip to content

Commit

Permalink
Closes linkedin#48. Make the percentage-based thresholds for infra ap…
Browse files Browse the repository at this point in the history
…p NameNode readiness configurable. (linkedin#49)
  • Loading branch information
xkrogen authored and sunchao committed Aug 16, 2018
1 parent 033a596 commit b541e2d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public Boolean get() {
LOG.info("Finished requesting datanode containers");

if (launchNameNode) {
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, true, exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, true, exitCritera, conf, LOG);
}

waitForCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ public Boolean get() {
}
DynoInfraUtils.waitForNameNodeStartup(namenodeProperties.get(), exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, false,
exitCritera, LOG);
exitCritera, getConf(), LOG);
break;
} catch (IOException ioe) {
LOG.error("Unexpected exception while waiting for NameNode readiness", ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,23 @@
*/
public class DynoInfraUtils {

public static final String APACHE_DOWNLOAD_MIRROR_KEY = "dyno.apache-mirror";
public static final String DYNO_CONF_PREFIX = "dyno.";
public static final String DYNO_INFRA_PREFIX = DYNO_CONF_PREFIX + "infra.";

public static final String APACHE_DOWNLOAD_MIRROR_KEY = DYNO_CONF_PREFIX + "apache-mirror";
// Set a generic mirror as the default.
public static final String APACHE_DOWNLOAD_MIRROR_DEFAULT = "http://mirrors.ocf.berkeley.edu/apache/";
private static final String APACHE_DOWNLOAD_MIRROR_SUFFIX_FORMAT = "hadoop/common/hadoop-%s/hadoop-%s.tar.gz";
public static final String HADOOP_TAR_FILENAME_FORMAT = "hadoop-%s.tar.gz";

public static final String DATANODE_LIVE_MIN_FRACTION_KEY = DYNO_INFRA_PREFIX + "ready.datanode-min-fraction";
public static final float DATANODE_LIVE_MIN_FRACTION_DEFAULT = 0.99f;
public static final String MISSING_BLOCKS_MAX_FRACTION_KEY = DYNO_INFRA_PREFIX + "ready.missing-blocks-max-fraction";
public static final float MISSING_BLOCKS_MAX_FRACTION_DEFAULT = 0.0001f;
public static final String UNDERREPLICATED_BLOCKS_MAX_FRACTION_KEY =
DYNO_INFRA_PREFIX + "ready.underreplicated-blocks-max-fraction";
public static final float UNDERREPLICATED_BLOCKS_MAX_FRACTION_DEFAULT = 0.01f;

// The JMX bean queries to execute for various beans.
public static final String NAMENODE_STARTUP_PROGRESS_JMX_QUERY = "Hadoop:service=NameNode,name=StartupProgress";
public static final String FSNAMESYSTEM_JMX_QUERY = "Hadoop:service=NameNode,name=FSNamesystem";
Expand Down Expand Up @@ -197,15 +208,16 @@ static void waitForNameNodeStartup(Properties nameNodeProperties, Supplier<Boole
* @param log Where to log inormation.
*/
static void waitForNameNodeReadiness(final Properties nameNodeProperties, int numTotalDataNodes,
boolean triggerBlockReports, Supplier<Boolean> shouldExit, final Log log)
boolean triggerBlockReports, Supplier<Boolean> shouldExit, final Configuration conf, final Log log)
throws IOException, InterruptedException {
if (shouldExit.get()) {
return;
}
log.info(String.format("Waiting for %d DataNodes to register with the NameNode...",
(int) (numTotalDataNodes*0.99f)));
int minDataNodes = (int)
(conf.getFloat(DATANODE_LIVE_MIN_FRACTION_KEY, DATANODE_LIVE_MIN_FRACTION_DEFAULT) * numTotalDataNodes);
log.info(String.format("Waiting for %d DataNodes to register with the NameNode...", minDataNodes));
waitForNameNodeJMXValue("Number of live DataNodes", FSNAMESYSTEM_STATE_JMX_QUERY, JMX_LIVE_NODE_COUNT,
numTotalDataNodes*0.99, numTotalDataNodes*0.001, false, nameNodeProperties, shouldExit, log);
minDataNodes, numTotalDataNodes*0.001, false, nameNodeProperties, shouldExit, log);
final int totalBlocks = Integer.parseInt(fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_BLOCKS_TOTAL));
Thread blockReportThread = null;
Expand All @@ -216,7 +228,6 @@ static void waitForNameNodeReadiness(final Properties nameNodeProperties, int nu
final int blockThreshold = totalBlocks / numTotalDataNodes * 2;
// The Configuration object here is based on the host cluster, which may
// have security enabled; we need to disable it to talk to the Dyno NN
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "false");
final DistributedFileSystem dfs =
Expand Down Expand Up @@ -276,12 +287,17 @@ public void run() {
blockReportThread.setUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
blockReportThread.start();
}
log.info("Waiting for MissingBlocks to fall below " + totalBlocks*0.0001 + "...");
float maxMissingBlocks = totalBlocks *
conf.getFloat(MISSING_BLOCKS_MAX_FRACTION_KEY, MISSING_BLOCKS_MAX_FRACTION_DEFAULT);
log.info("Waiting for MissingBlocks to fall below " + maxMissingBlocks + "...");
waitForNameNodeJMXValue("Number of missing blocks", FSNAMESYSTEM_JMX_QUERY, JMX_MISSING_BLOCKS,
totalBlocks*0.0001, totalBlocks*0.0001, true, nameNodeProperties, shouldExit, log);
log.info("Waiting for UnderReplicatedBlocks to fall below " + totalBlocks*0.01 + "...");
maxMissingBlocks, totalBlocks*0.0001, true, nameNodeProperties, shouldExit, log);
float maxUnderreplicatedBlocks = totalBlocks *
conf.getFloat(UNDERREPLICATED_BLOCKS_MAX_FRACTION_KEY, UNDERREPLICATED_BLOCKS_MAX_FRACTION_DEFAULT);
log.info("Waiting for UnderReplicatedBlocks to fall below " + maxUnderreplicatedBlocks + "...");
waitForNameNodeJMXValue("Number of under replicated blocks", FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_UNDER_REPLICATED_BLOCKS, totalBlocks*0.01, totalBlocks*0.001, true, nameNodeProperties, shouldExit, log);
JMX_UNDER_REPLICATED_BLOCKS, maxUnderreplicatedBlocks, totalBlocks*0.001, true, nameNodeProperties,
shouldExit, log);
log.info("NameNode is ready for use!");
if (blockReportThread != null) {
blockReportThread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public Boolean get() {
fail("Unable to fetch NameNode properties");
}

DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, falseSupplier, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, falseSupplier, conf, LOG);

// Test that we can successfully write to / read from the cluster
try {
Expand Down

0 comments on commit b541e2d

Please sign in to comment.