Skip to content

Commit

Permalink
Refactor and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Gregorian committed Feb 14, 2019
1 parent 3bc3998 commit 67873d7
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.google.common.base.StandardSystemProperty;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver;
import java.io.File;
Expand All @@ -39,7 +38,6 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
Expand Down Expand Up @@ -924,12 +922,8 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) {
for (Map.Entry<String, String> configPair : workloadExtraConfigs.entrySet()) {
workloadConf.set(configPair.getKey(), configPair.getValue());
}
Class <? extends WorkloadReducer> reducerClass = null;
if (workloadOutputPath != null) {
reducerClass = AuditReplayReducer.class;
}
workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(),
workloadStartTime, AuditReplayMapper.class, reducerClass);
workloadStartTime, AuditReplayMapper.class);
workloadJob.submit();
while (!isCompleted(infraAppState) && !isCompleted(workloadAppState)) {
workloadJob.monitorAndPrintJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand All @@ -47,7 +43,6 @@ public class WorkloadDriver extends Configured implements Tool {
public static final String START_TIME_OFFSET_DEFAULT = "1m";
public static final String NN_URI = "nn_uri";
public static final String MAPPER_CLASS_NAME = "mapper_class_name";
public static final String REDUCER_CLASS_NAME = "reducer_class_name";

public int run(String[] args) throws Exception {
Option helpOption = new Option("h", "help", false, "Shows this message. Additionally specify the " + MAPPER_CLASS_NAME
Expand All @@ -68,11 +63,6 @@ public int run(String[] args) throws Exception {
"1. AuditReplayMapper \n" + "2. CreateFileMapper \nFully specified class names are also supported.")
.isRequired().create(MAPPER_CLASS_NAME);
options.addOption(mapperClassOption);
Option reducerClassOption = OptionBuilder.withArgName("Reducer ClassName").hasArg().withDescription(
"Class name of the reducer (optional); must be a Reducer subclass. Reducers supported currently: \n" +
"1. AuditReplayReducer \nFully specified class names are also supported.")
.create(REDUCER_CLASS_NAME);
options.addOption(reducerClassOption);

Options helpOptions = new Options();
helpOptions.addOption(helpOption);
Expand Down Expand Up @@ -110,23 +100,15 @@ public int run(String[] args) throws Exception {
System.err.println(getMapperUsageInfo(cli.getOptionValue(MAPPER_CLASS_NAME)));
return 1;
}
Class<? extends WorkloadReducer> reducerClass = null;
if (cli.getOptionValue(REDUCER_CLASS_NAME) != null) {
reducerClass = getReducerClass(cli.getOptionValue(REDUCER_CLASS_NAME));
if (!reducerClass.newInstance().verifyConfigurations(getConf())) {
System.err.println("Incorrect config for " + reducerClass.getName());
return 1;
}
}

Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass, reducerClass);
Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass);

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static Job getJobForSubmission(Configuration baseConf, String nnURI, long startTimestampMs,
Class<? extends WorkloadMapper> mapperClass, Class<? extends WorkloadReducer> reducerClass) throws IOException, ClassNotFoundException,
Class<? extends WorkloadMapper> mapperClass) throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
Configuration conf = new Configuration(baseConf);
conf.set(NN_URI, nnURI);
Expand All @@ -139,23 +121,8 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long
Job job = Job.getInstance(conf, "Dynamometer Workload Driver");
job.setJarByClass(mapperClass);
job.setMapperClass(mapperClass);
job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf));
if (reducerClass == null) {
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
} else {
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(reducerClass.newInstance().getOutputFormat(conf));
job.setReducerClass(reducerClass);
}

mapperClass.newInstance().configureJob(job);

return job;
}
Expand All @@ -177,18 +144,6 @@ private Class<? extends WorkloadMapper> getMapperClass(String className) throws
return (Class<? extends WorkloadMapper>) mapperClass;
}

private Class<? extends WorkloadReducer> getReducerClass(String className) throws ClassNotFoundException {
if (!className.contains(".")) {
className = WorkloadDriver.class.getPackage().getName() + "." + className;
}
Class<?> mapperClass = getConf().getClassByName(className);
if (!WorkloadReducer.class.isAssignableFrom(mapperClass)) {
throw new IllegalArgumentException(className + " is not a subclass of " +
WorkloadReducer.class.getCanonicalName());
}
return (Class<? extends WorkloadReducer>) mapperClass;
}

private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException,
InstantiationException, IllegalAccessException {
WorkloadMapper<?, ?, ?, ?> mapper = getMapperClass(mapperClassName).newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;


/**
Expand All @@ -21,7 +23,7 @@ public abstract class WorkloadMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends M
/**
* Return the input class to be used by this mapper.
*/
public Class<? extends InputFormat> getInputFormat(Configuration conf) {
public static Class<? extends InputFormat> getInputFormat(Configuration conf) {
return TimedInputFormat.class;
}

Expand All @@ -41,4 +43,13 @@ public Class<? extends InputFormat> getInputFormat(Configuration conf) {
*/
public abstract boolean verifyConfigurations(Configuration conf);

/**
* Get the associated Reducer class to run on the outputted kv pairs.
*/
public void configureJob(Job job) {
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.READ;
import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.WRITE;
Expand Down Expand Up @@ -58,6 +61,7 @@
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, Text, LongWritable> {

public static final String INPUT_PATH_KEY = "auditreplay.input-path";
public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
public static final int NUM_THREADS_DEFAULT = 1;
public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
Expand Down Expand Up @@ -141,11 +145,6 @@ public enum CommandType {
private AuditCommandParser commandParser;
private ScheduledThreadPoolExecutor progressExecutor;

@Override
public Class<? extends InputFormat> getInputFormat(Configuration conf) {
return NoSplitTextInputFormat.class;
}

@Override
public String getDescription() {
return "This mapper replays audit log files.";
Expand All @@ -155,6 +154,7 @@ public String getDescription() {
public List<String> getConfigDescriptions() {
return Lists.newArrayList(
INPUT_PATH_KEY + " (required): Path to directory containing input files.",
OUTPUT_PATH_KEY + " (required): Path to destination for output files.",
NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.",
CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " +
"performing `create` commands.",
Expand All @@ -166,7 +166,7 @@ public List<String> getConfigDescriptions() {

@Override
public boolean verifyConfigurations(Configuration conf) {
return conf.get(INPUT_PATH_KEY) != null;
return conf.get(INPUT_PATH_KEY) != null && conf.get(OUTPUT_PATH_KEY) != null;
}

@Override
Expand Down Expand Up @@ -252,4 +252,19 @@ public void cleanup(Mapper.Context context) throws InterruptedException {
LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps);
}
}

@Override
public void configureJob(Job job) {
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setInputFormatClass(NoSplitTextInputFormat.class);

job.setNumReduceTasks(1);
job.setReducerClass(AuditReplayReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY)));
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
/*
* Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

import com.google.common.collect.Lists;
import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.List;

public class AuditReplayReducer extends
WorkloadReducer<Text, LongWritable, Text, LongWritable> {
public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";

@Override
public Class<? extends OutputFormat> getOutputFormat(Configuration conf) {
return AuditTextOutputFormat.class;
}

@Override
public boolean verifyConfigurations(Configuration conf) {
return conf.get(OUTPUT_PATH_KEY) != null;
}
Reducer<Text, LongWritable, Text, LongWritable> {

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public class AuditReplayThread extends Thread {
private long startTimestampMs;
private boolean createBlocks;

private Text userCommandKey = new Text();
private LongWritable userCommandLatency = new LongWritable();

// Counters are not thread-safe so we store a local mapping in our thread
// and merge them all together at the end.
private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
Expand Down Expand Up @@ -259,11 +262,12 @@ public FileSystem run() {
break;
}

String key = command.getSimpleUgi() + "_" + replayCommand.getType().toString();
userCommandKey.set(command.getSimpleUgi() + "_" + replayCommand.getType().toString());
long latency = System.currentTimeMillis() - startTime;
userCommandLatency.set(latency);

try {
mapperContext.write(new Text(key), new LongWritable(latency));
mapperContext.write(userCommandKey, userCommandLatency);
} catch (InterruptedException|IOException e) {
throw new IOException("Error writing to context", e);
}
Expand Down

This file was deleted.

Loading

0 comments on commit 67873d7

Please sign in to comment.