Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for a reducer step that aggregates per-user metrics #76

Merged
merged 10 commits into from
Feb 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ scripts.tar
*.iml
.idea/
gradle.out
*/out
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class TestDynamometerInfra {
private static final String NAMENODE_NODELABEL = "dyno_namenode";
private static final String DATANODE_NODELABEL = "dyno_datanode";

private static final String OUTPUT_PATH = "/tmp/trace_output_direct";

private static MiniDFSCluster miniDFSCluster;
private static MiniYARNCluster miniYARNCluster;
private static YarnClient yarnClient;
Expand Down Expand Up @@ -278,7 +280,7 @@ public void run() {
"-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop",
"-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
"-" + Client.WORKLOAD_INPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
"-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/trace_output_direct")).toString(),
"-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path(OUTPUT_PATH)).toString(),
"-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
"-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
"-" + AMOptions.NAMENODE_ARGS_ARG, "-Ddfs.namenode.safemode.extension=0"
Expand Down Expand Up @@ -403,6 +405,8 @@ public Boolean get() {
}
}
}, 3000, 60000);

assertTrue(fs.exists(new Path(OUTPUT_PATH)));
}

private static URI getResourcePath(String resourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

/**
* Represents the base class for a generic workload-generating mapper. By default, it will expect to use
* {@link TimedInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat}
* should override the {@link #getInputFormat(Configuration)} method.
* {@link TimedInputFormat} as its {@link InputFormat}. Subclasses requiring a reducer or expecting
* a different {@link InputFormat} should override the {@link #configureJob(Job)} method.
*/
public abstract class WorkloadMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

import java.io.IOException;


/**
* <p>AuditReplayReducer aggregates the returned latency values from {@link AuditReplayMapper} and sums
* them up by {@link UserCommandKey}, which combines the user's id that ran the command and the type
* of the command (READ/WRITE).
*/
public class AuditReplayReducer extends
Reducer<UserCommandKey, LongWritable, UserCommandKey, LongWritable> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,9 @@ void drainCounters(Mapper.Context context) {
}
}

void drainCommandLatencies(Mapper.Context context) throws IOException {
void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException {
for (Map.Entry<UserCommandKey, LongWritable> ent : commandLatencyMap.entrySet()) {
try {
context.write(ent.getKey(), ent.getValue());
} catch (IOException|InterruptedException e) {
throw new IOException("Error writing to context", e);
}
context.write(ent.getKey(), ent.getValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;


/**
* <p>UserCommandKey is a {@link Writable} used as a composite key combining the user id and
* type of a replayed command. It is used as the output key for AuditReplayMapper and the
* keys for AuditReplayReducer.
*/
public class UserCommandKey implements WritableComparable {
private Text user;
private Text command;
Expand All @@ -36,18 +40,10 @@ public String getUser() {
return user.toString();
}

public void setUser(String user) {
this.user.set(user);
}

public String getCommand() {
return command.toString();
}

public void setCommand(String command) {
this.command.set(command);
}

@Override
public void write(DataOutput out) throws IOException {
user.write(out);
Expand All @@ -62,7 +58,7 @@ public void readFields(DataInput in) throws IOException {

@Override
public int compareTo(@Nonnull Object o) {
return Integer.compare(hashCode(), o.hashCode());
return toString().compareTo(o.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
Expand All @@ -27,9 +28,7 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;


public class TestWorkloadGenerator {
Expand Down Expand Up @@ -113,10 +112,9 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio
assertFalse(dfs.exists(new Path("/denied")));

assertTrue(dfs.exists(new Path(auditOutputPath)));
FSDataInputStream auditOutput = dfs.open(new Path(auditOutputPath + "/part-r-00000"));
byte[] buf = new byte[auditOutput.available()];
assertTrue(auditOutput.read(buf) > 0);
System.out.println(new String(buf));
assertTrue(new String(buf).matches(".*hdfs,WRITE\\t[0-9]+\\n.*"));
try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath + "/part-r-00000"))) {
String auditOutput = IOUtils.toString(auditOutputFile);
assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*"));
}
}
}