From 67873d7fbeffae5e6ba2b6b10b4ce0e8928ac8dd Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Feb 2019 15:30:43 -0800 Subject: [PATCH] Refactor and clean up --- .../java/com/linkedin/dynamometer/Client.java | 8 +-- .../workloadgenerator/WorkloadDriver.java | 53 ++----------------- .../workloadgenerator/WorkloadMapper.java | 13 ++++- .../workloadgenerator/WorkloadReducer.java | 18 ------- .../audit/AuditReplayMapper.java | 29 +++++++--- .../audit/AuditReplayReducer.java | 24 +++------ .../audit/AuditReplayThread.java | 8 ++- .../audit/AuditTextOutputFormat.java | 26 --------- .../TestWorkloadGenerator.java | 38 +++++-------- 9 files changed, 63 insertions(+), 154 deletions(-) delete mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java delete mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 294fea47f9..5b06151e27 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -12,7 +12,6 @@ import com.google.common.base.StandardSystemProperty; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver; import java.io.File; @@ -39,7 +38,6 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -924,12 +922,8 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { workloadConf.set(configPair.getKey(), configPair.getValue()); } - Class reducerClass = null; - if (workloadOutputPath != null) { - reducerClass = AuditReplayReducer.class; - } workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(), - workloadStartTime, AuditReplayMapper.class, reducerClass); + workloadStartTime, AuditReplayMapper.class); workloadJob.submit(); while (!isCompleted(infraAppState) && !isCompleted(workloadAppState)) { workloadJob.monitorAndPrintJob(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java index 863131b0fd..c88843861d 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java @@ -22,11 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -47,7 +43,6 @@ public class WorkloadDriver extends Configured implements Tool { public static final String START_TIME_OFFSET_DEFAULT = "1m"; public static final String NN_URI = "nn_uri"; public static final String MAPPER_CLASS_NAME = "mapper_class_name"; - public static final String REDUCER_CLASS_NAME = "reducer_class_name"; public int run(String[] args) throws Exception { Option helpOption = new Option("h", "help", false, "Shows this message. Additionally specify the " + MAPPER_CLASS_NAME @@ -68,11 +63,6 @@ public int run(String[] args) throws Exception { "1. AuditReplayMapper \n" + "2. CreateFileMapper \nFully specified class names are also supported.") .isRequired().create(MAPPER_CLASS_NAME); options.addOption(mapperClassOption); - Option reducerClassOption = OptionBuilder.withArgName("Reducer ClassName").hasArg().withDescription( - "Class name of the reducer (optional); must be a Reducer subclass. Reducers supported currently: \n" + - "1. AuditReplayReducer \nFully specified class names are also supported.") - .create(REDUCER_CLASS_NAME); - options.addOption(reducerClassOption); Options helpOptions = new Options(); helpOptions.addOption(helpOption); @@ -110,23 +100,15 @@ public int run(String[] args) throws Exception { System.err.println(getMapperUsageInfo(cli.getOptionValue(MAPPER_CLASS_NAME))); return 1; } - Class reducerClass = null; - if (cli.getOptionValue(REDUCER_CLASS_NAME) != null) { - reducerClass = getReducerClass(cli.getOptionValue(REDUCER_CLASS_NAME)); - if (!reducerClass.newInstance().verifyConfigurations(getConf())) { - System.err.println("Incorrect config for " + reducerClass.getName()); - return 1; - } - } - Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass, reducerClass); + Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static Job getJobForSubmission(Configuration baseConf, String nnURI, long startTimestampMs, - Class mapperClass, Class reducerClass) throws IOException, ClassNotFoundException, + Class mapperClass) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { Configuration conf = new Configuration(baseConf); conf.set(NN_URI, nnURI); @@ -139,23 +121,8 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long Job job = Job.getInstance(conf, "Dynamometer Workload Driver"); job.setJarByClass(mapperClass); job.setMapperClass(mapperClass); - job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf)); - if (reducerClass == null) { - job.setNumReduceTasks(0); - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); - job.setOutputFormatClass(NullOutputFormat.class); - } else { - job.setNumReduceTasks(1); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setOutputFormatClass(reducerClass.newInstance().getOutputFormat(conf)); - job.setReducerClass(reducerClass); - } + + mapperClass.newInstance().configureJob(job); return job; } @@ -177,18 +144,6 @@ private Class getMapperClass(String className) throws return (Class) mapperClass; } - private Class getReducerClass(String className) throws ClassNotFoundException { - if (!className.contains(".")) { - className = WorkloadDriver.class.getPackage().getName() + "." + className; - } - Class mapperClass = getConf().getClassByName(className); - if (!WorkloadReducer.class.isAssignableFrom(mapperClass)) { - throw new IllegalArgumentException(className + " is not a subclass of " + - WorkloadReducer.class.getCanonicalName()); - } - return (Class) mapperClass; - } - private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException, InstantiationException, IllegalAccessException { WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java index e41e61973c..65817bf200 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java @@ -8,7 +8,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; /** @@ -21,7 +23,7 @@ public abstract class WorkloadMapper extends M /** * Return the input class to be used by this mapper. */ - public Class getInputFormat(Configuration conf) { + public static Class getInputFormat(Configuration conf) { return TimedInputFormat.class; } @@ -41,4 +43,13 @@ public Class getInputFormat(Configuration conf) { */ public abstract boolean verifyConfigurations(Configuration conf); + /** + * Get the associated Reducer class to run on the outputted kv pairs. + */ + public void configureJob(Job job) { + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java deleted file mode 100644 index 81855f3395..0000000000 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.linkedin.dynamometer.workloadgenerator; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; - -import java.util.List; - -public abstract class WorkloadReducer - extends Reducer { - - public Class getOutputFormat(Configuration conf) { - return NullOutputFormat.class; - } - - public abstract boolean verifyConfigurations(Configuration conf); -} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 05372bf6a6..2e45454fb8 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -18,16 +18,19 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.READ; import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.WRITE; @@ -58,6 +61,7 @@ public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; + public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; public static final String NUM_THREADS_KEY = "auditreplay.num-threads"; public static final int NUM_THREADS_DEFAULT = 1; public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks"; @@ -141,11 +145,6 @@ public enum CommandType { private AuditCommandParser commandParser; private ScheduledThreadPoolExecutor progressExecutor; - @Override - public Class getInputFormat(Configuration conf) { - return NoSplitTextInputFormat.class; - } - @Override public String getDescription() { return "This mapper replays audit log files."; @@ -155,6 +154,7 @@ public String getDescription() { public List getConfigDescriptions() { return Lists.newArrayList( INPUT_PATH_KEY + " (required): Path to directory containing input files.", + OUTPUT_PATH_KEY + " (required): Path to destination for output files.", NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.", CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " + "performing `create` commands.", @@ -166,7 +166,7 @@ public List getConfigDescriptions() { @Override public boolean verifyConfigurations(Configuration conf) { - return conf.get(INPUT_PATH_KEY) != null; + return conf.get(INPUT_PATH_KEY) != null && conf.get(OUTPUT_PATH_KEY) != null; } @Override @@ -252,4 +252,19 @@ public void cleanup(Mapper.Context context) throws InterruptedException { LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps); } } + + @Override + public void configureJob(Job job) { + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setInputFormatClass(NoSplitTextInputFormat.class); + + job.setNumReduceTasks(1); + job.setReducerClass(AuditReplayReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + + TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); + } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 20f3df5a8f..2be1f9c4bb 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -1,29 +1,17 @@ +/* + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ package com.linkedin.dynamometer.workloadgenerator.audit; -import com.google.common.collect.Lists; -import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; -import java.util.List; public class AuditReplayReducer extends - WorkloadReducer { - public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; - - @Override - public Class getOutputFormat(Configuration conf) { - return AuditTextOutputFormat.class; - } - - @Override - public boolean verifyConfigurations(Configuration conf) { - return conf.get(OUTPUT_PATH_KEY) != null; - } + Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 3aa7d51c10..215ca3844e 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -62,6 +62,9 @@ public class AuditReplayThread extends Thread { private long startTimestampMs; private boolean createBlocks; + private Text userCommandKey = new Text(); + private LongWritable userCommandLatency = new LongWritable(); + // Counters are not thread-safe so we store a local mapping in our thread // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); @@ -259,11 +262,12 @@ public FileSystem run() { break; } - String key = command.getSimpleUgi() + "_" + replayCommand.getType().toString(); + userCommandKey.set(command.getSimpleUgi() + "_" + replayCommand.getType().toString()); long latency = System.currentTimeMillis() - startTime; + userCommandLatency.set(latency); try { - mapperContext.write(new Text(key), new LongWritable(latency)); + mapperContext.write(userCommandKey, userCommandLatency); } catch (InterruptedException|IOException e) { throw new IOException("Error writing to context", e); } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java deleted file mode 100644 index c8c80d8a7b..0000000000 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. - * See LICENSE in the project root for license information. - */ -package com.linkedin.dynamometer.workloadgenerator.audit; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -import java.io.IOException; -import java.util.List; - -public class AuditTextOutputFormat extends TextOutputFormat { - @Override - public void checkOutputSpecs(JobContext context) throws IOException { - context.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", - context.getConfiguration().get(AuditReplayReducer.OUTPUT_PATH_KEY)); - super.checkOutputSpecs(context); - } -} diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index a9820ab4ef..a3db248826 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -4,9 +4,13 @@ */ package com.linkedin.dynamometer.workloadgenerator; -import com.linkedin.dynamometer.workloadgenerator.audit.*; - import java.io.IOException; + +import com.linkedin.dynamometer.workloadgenerator.audit.AuditCommandParser; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -55,33 +59,15 @@ public void tearDown() throws Exception { } } - @Test - public void testAuditWorkloadDirectParser() throws Exception { - String workloadInputPath = - TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); - conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkloadWithReducer(null); - } - - @Test - public void testAuditWorkloadHiveParser() throws Exception { - String workloadInputPath = - TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); - conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkloadWithReducer(null); - } - @Test public void testAuditWorkloadDirectParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); String auditOutputPath = "/reducer_output/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkloadWithReducer(AuditReplayReducer.class); + testAuditWorkload(); assertTrue(dfs.exists(new Path(auditOutputPath))); } @@ -91,9 +77,9 @@ public void testAuditWorkloadHiveParserWithOutput() throws Exception { TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); String auditOutputPath = "/reducer_output/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkloadWithReducer(AuditReplayReducer.class); + testAuditWorkload(); assertTrue(dfs.exists(new Path(auditOutputPath))); } @@ -116,10 +102,10 @@ public void authorize(UserGroupInformation user, String remoteAddress) throws Au } } - private void testAuditWorkloadWithReducer(Class reducerClass) throws Exception { + private void testAuditWorkload() throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), - workloadStartTime, AuditReplayMapper.class, reducerClass); + workloadStartTime, AuditReplayMapper.class); boolean success = workloadJob.waitForCompletion(true); assertTrue("workload job should succeed", success); Counters counters = workloadJob.getCounters();