Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #53. Enhance the blockReportThread to continue requesting reports until there are no more DataNodes which match the criteria #54

Merged
merged 2 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public Boolean get() {
LOG.info("Finished requesting datanode containers");

if (launchNameNode) {
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, true, exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, true, exitCritera, conf, LOG);
}

waitForCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ public Boolean get() {
}
DynoInfraUtils.waitForNameNodeStartup(namenodeProperties.get(), exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, false,
exitCritera, LOG);
exitCritera, getConf(), LOG);
break;
} catch (IOException ioe) {
LOG.error("Unexpected exception while waiting for NameNode readiness", ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -49,12 +50,23 @@
*/
public class DynoInfraUtils {

public static final String APACHE_DOWNLOAD_MIRROR_KEY = "dyno.apache-mirror";
public static final String DYNO_CONF_PREFIX = "dyno.";
public static final String DYNO_INFRA_PREFIX = DYNO_CONF_PREFIX + "infra.";

public static final String APACHE_DOWNLOAD_MIRROR_KEY = DYNO_CONF_PREFIX + "apache-mirror";
// Set a generic mirror as the default.
public static final String APACHE_DOWNLOAD_MIRROR_DEFAULT = "http://mirrors.ocf.berkeley.edu/apache/";
private static final String APACHE_DOWNLOAD_MIRROR_SUFFIX_FORMAT = "hadoop/common/hadoop-%s/hadoop-%s.tar.gz";
public static final String HADOOP_TAR_FILENAME_FORMAT = "hadoop-%s.tar.gz";

public static final String DATANODE_LIVE_MIN_FRACTION_KEY = DYNO_INFRA_PREFIX + "ready.datanode-min-fraction";
public static final float DATANODE_LIVE_MIN_FRACTION_DEFAULT = 0.99f;
public static final String MISSING_BLOCKS_MAX_FRACTION_KEY = DYNO_INFRA_PREFIX + "ready.missing-blocks-max-fraction";
public static final float MISSING_BLOCKS_MAX_FRACTION_DEFAULT = 0.0001f;
public static final String UNDERREPLICATED_BLOCKS_MAX_FRACTION_KEY =
DYNO_INFRA_PREFIX + "ready.underreplicated-blocks-max-fraction";
public static final float UNDERREPLICATED_BLOCKS_MAX_FRACTION_DEFAULT = 0.01f;

// The JMX bean queries to execute for various beans.
public static final String NAMENODE_STARTUP_PROGRESS_JMX_QUERY = "Hadoop:service=NameNode,name=StartupProgress";
public static final String FSNAMESYSTEM_JMX_QUERY = "Hadoop:service=NameNode,name=FSNamesystem";
Expand Down Expand Up @@ -197,26 +209,27 @@ static void waitForNameNodeStartup(Properties nameNodeProperties, Supplier<Boole
* @param log Where to log inormation.
*/
static void waitForNameNodeReadiness(final Properties nameNodeProperties, int numTotalDataNodes,
boolean triggerBlockReports, Supplier<Boolean> shouldExit, final Log log)
boolean triggerBlockReports, Supplier<Boolean> shouldExit, final Configuration conf, final Log log)
throws IOException, InterruptedException {
if (shouldExit.get()) {
return;
}
log.info(String.format("Waiting for %d DataNodes to register with the NameNode...",
(int) (numTotalDataNodes*0.99f)));
int minDataNodes = (int)
(conf.getFloat(DATANODE_LIVE_MIN_FRACTION_KEY, DATANODE_LIVE_MIN_FRACTION_DEFAULT) * numTotalDataNodes);
log.info(String.format("Waiting for %d DataNodes to register with the NameNode...", minDataNodes));
waitForNameNodeJMXValue("Number of live DataNodes", FSNAMESYSTEM_STATE_JMX_QUERY, JMX_LIVE_NODE_COUNT,
numTotalDataNodes*0.99, numTotalDataNodes*0.001, false, nameNodeProperties, shouldExit, log);
minDataNodes, numTotalDataNodes*0.001, false, nameNodeProperties, shouldExit, log);
final int totalBlocks = Integer.parseInt(fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_BLOCKS_TOTAL));
Thread blockReportThread = null;
final AtomicBoolean doneWaiting = new AtomicBoolean(false);
if (triggerBlockReports) {
// This will be significantly lower than the actual expected number of blocks because it does not
// take into account replication factor. However the block reports are pretty binary; either a full
// report has been received or it hasn't. Thus we don't mind the large underestimate here.
final int blockThreshold = totalBlocks / numTotalDataNodes * 2;
// The Configuration object here is based on the host cluster, which may
// have security enabled; we need to disable it to talk to the Dyno NN
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "false");
final DistributedFileSystem dfs =
Expand Down Expand Up @@ -244,6 +257,10 @@ public void run() {
String liveNodeListString =
fetchNameNodeJMXValue(nameNodeProperties, NAMENODE_INFO_JMX_QUERY, JMX_LIVE_NODES_LIST);
Set<String> datanodesToReport = parseStaleDataNodeList(liveNodeListString, blockThreshold, log);
if (datanodesToReport.isEmpty() && doneWaiting.get()) {
log.info("BlockReportThread exiting; all DataNodes have reported blocks");
break;
}
log.info(String.format("Queueing %d Datanodes for block report: %s", datanodesToReport.size(),
Joiner.on(",").join(datanodesToReport)));
DatanodeInfo[] datanodes = dfs.getDataNodeStats();
Expand Down Expand Up @@ -273,26 +290,23 @@ public void run() {
log.info("Block reporting thread exiting");
}
};
blockReportThread.setDaemon(true);
blockReportThread.setUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
blockReportThread.start();
}
log.info("Waiting for MissingBlocks to fall below " + totalBlocks*0.0001 + "...");
float maxMissingBlocks = totalBlocks *
conf.getFloat(MISSING_BLOCKS_MAX_FRACTION_KEY, MISSING_BLOCKS_MAX_FRACTION_DEFAULT);
log.info("Waiting for MissingBlocks to fall below " + maxMissingBlocks + "...");
waitForNameNodeJMXValue("Number of missing blocks", FSNAMESYSTEM_JMX_QUERY, JMX_MISSING_BLOCKS,
totalBlocks*0.0001, totalBlocks*0.0001, true, nameNodeProperties, shouldExit, log);
log.info("Waiting for UnderReplicatedBlocks to fall below " + totalBlocks*0.01 + "...");
maxMissingBlocks, totalBlocks*0.0001, true, nameNodeProperties, shouldExit, log);
float maxUnderreplicatedBlocks = totalBlocks *
conf.getFloat(UNDERREPLICATED_BLOCKS_MAX_FRACTION_KEY, UNDERREPLICATED_BLOCKS_MAX_FRACTION_DEFAULT);
log.info("Waiting for UnderReplicatedBlocks to fall below " + maxUnderreplicatedBlocks + "...");
waitForNameNodeJMXValue("Number of under replicated blocks", FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_UNDER_REPLICATED_BLOCKS, totalBlocks*0.01, totalBlocks*0.001, true, nameNodeProperties, shouldExit, log);
JMX_UNDER_REPLICATED_BLOCKS, maxUnderreplicatedBlocks, totalBlocks*0.001, true, nameNodeProperties,
shouldExit, log);
log.info("NameNode is ready for use!");
if (blockReportThread != null) {
blockReportThread.interrupt();
log.debug("Interrupted block report thread; joining");
blockReportThread.join(5000);
if (blockReportThread.isAlive()) {
log.debug("Joined block report thread");
} else {
log.warn("Unable to join block report thread after 5s; continuing");
}
}
doneWaiting.set(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public Boolean get() {
fail("Unable to fetch NameNode properties");
}

DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, falseSupplier, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, falseSupplier, conf, LOG);

// Test that we can successfully write to / read from the cluster
try {
Expand Down