Skip to content

Commit

Permalink
subscriber update listopic, random suffix client id
Browse files Browse the repository at this point in the history
  • Loading branch information
tuanhiep committed May 4, 2014
1 parent 72c393f commit 5bd022c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ public void setupTest(String host, String clientId, int size) {
JMeterContext jmcx = JMeterContextService.getContext();
this.connectionArray= new FutureConnection[size];
if(size==1){
this.connectionArray[0]= createConnection(host,clientId+" "+jmcx.getThreadNum());
this.connectionArray[0]= createConnection(host,clientId+jmcx.getThreadNum());
this.connectionArray[0].connect().await();
}
else
{
for(int i = 0;i< size;i++){
this.connectionArray[i]= createConnection(host,clientId+" "+jmcx.getThreadNum()+" "+i);
this.connectionArray[i]= createConnection(host,clientId+jmcx.getThreadNum()+i);
this.connectionArray[i].connect().await();
}
}
Expand All @@ -117,13 +117,13 @@ public void setupTest(String host, String clientId, String user, String password
this.connectionArray= new FutureConnection[size];

if(size==1){
this.connectionArray[0]= createConnection(host,clientId+" "+jmcx.getThreadNum(),user,password);
this.connectionArray[0]= createConnection(host,clientId+jmcx.getThreadNum(),user,password);
this.connectionArray[0].connect().await();

}
else {
for(int i = 0;i< size;i++){
this.connectionArray[i]= createConnection(host,clientId+" "+jmcx.getThreadNum()+" "+i,user,password);
this.connectionArray[i]= createConnection(host,clientId+jmcx.getThreadNum()+i,user,password);
this.connectionArray[i].connect().await();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.apache.jmeter.protocol.mqtt.client;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContext;
import org.apache.jmeter.threads.JMeterContextService;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
Expand All @@ -37,8 +40,7 @@ Licensed to the Apache Software Foundation (ASF) under one

public class MqttSubscriber extends AbstractJavaSamplerClient implements Serializable {
private static final long serialVersionUID = 1L;
private FutureConnection connection;

private FutureConnection[] connectionArray;


@Override
Expand All @@ -53,47 +55,67 @@ public Arguments getDefaultParameters() {
}

public void setupTest(JavaSamplerContext context){

if("TRUE".equals(context.getParameter("AUTH"))){
this.setupTest(context.getParameter( "HOST" ),
context.getParameter( "TOPIC" ),
Boolean.parseBoolean(context.getParameter("DURABLE")),
context.getParameter( "CLIENT_ID" ),
context.getParameter("USER"),
context.getParameter("PASSWORD"));

String host = context.getParameter("HOST");
String clientId = context.getParameter("CLIENT_ID");
if("TRUE".equalsIgnoreCase(context.getParameter("RANDOM_SUFFIX"))){
clientId= MqttPublisher.getClientId(clientId,Integer.parseInt(context.getParameter("SUFFIX_LENGTH")));
}
else {
setupTest(context.getParameter( "HOST" ),
context.getParameter( "TOPIC" ),
Boolean.parseBoolean(context.getParameter("DURABLE")),
context.getParameter( "CLIENT_ID" ));
if("FALSE".equals(context.getParameter("PER_TOPIC"))){
String topic= context.getParameter("TOPIC");
if("TRUE".equals(context.getParameter("AUTH"))){
setupTest(host,clientId,topic,context.getParameter("USER"),context.getParameter("PASSWORD"),1,Boolean.parseBoolean(context.getParameter("DURABLE")));
}
else{ setupTest(host, clientId,topic,1,Boolean.parseBoolean(context.getParameter("DURABLE")));}
}


}
private void setupTest(String host, String topic, boolean durable, String clientId,String user, String password){
else if("TRUE".equals(context.getParameter("PER_TOPIC"))){
String topics= context.getParameter("TOPIC");
String[] topicArray = topics.split("\\s*,\\s*");
int size= topicArray.length;
if("TRUE".equals(context.getParameter("AUTH"))){
setupTest(host,clientId,topics,context.getParameter("USER"),context.getParameter("PASSWORD"),size,Boolean.parseBoolean(context.getParameter("DURABLE")));
}
else { setupTest(host, clientId,topics,size,Boolean.parseBoolean(context.getParameter("DURABLE")));
}
}
}
private void setupTest(String host,String clientId, String topic, String user,String password,int size, boolean durable){
try {

this.connection = createConnection(host, clientId, durable,user,password);

this.connection.connect().await();

this.connection.subscribe(new Topic[]{new Topic(topic, QoS.EXACTLY_ONCE)}).await();

this.connectionArray= new FutureConnection[size];
JMeterContext jmcx = JMeterContextService.getContext();
if(size==1){
this.connectionArray[0]=createConnection(host, clientId+" "+jmcx.getThreadNum(), durable,user,password);
this.connectionArray[0].connect().await();
this.connectionArray[0].subscribe(new Topic[]{new Topic(topic, QoS.EXACTLY_ONCE)}).await();
}
else if(size>1){
String[] topicArray = topic.split("\\s*,\\s*");
for(int j=0;j<size;j++){
this.connectionArray[j]=createConnection(host, clientId+" "+jmcx.getThreadNum()+""+j, durable,user,password);
this.connectionArray[j].connect().await();
this.connectionArray[j].subscribe(new Topic[]{new Topic(topicArray[j], QoS.EXACTLY_ONCE)}).await();
}
}
} catch (Exception e) {
getLogger().error(e.getMessage());
}
}
private void setupTest(String host, String topic, boolean durable, String clientId){
private void setupTest(String host, String clientId, String topic,int size, boolean durable){
try {

this.connection = createConnection(host, clientId, durable);

this.connection.connect().await();

this.connection.subscribe(new Topic[]{new Topic(topic, QoS.EXACTLY_ONCE)}).await();

this.connectionArray= new FutureConnection[size];
JMeterContext jmcx = JMeterContextService.getContext();
if(size==1){
this.connectionArray[0]=createConnection(host, clientId+jmcx.getThreadNum(), durable);
this.connectionArray[0].connect().await();
this.connectionArray[0].subscribe(new Topic[]{new Topic(topic, QoS.EXACTLY_ONCE)}).await();
}
else if(size>1){
String[] topicArray = topic.split("\\s*,\\s*");
for(int j=0;j<size;j++){
this.connectionArray[j]=createConnection(host, clientId+jmcx.getThreadNum()+j, durable);
this.connectionArray[j].connect().await();
this.connectionArray[j].subscribe(new Topic[]{new Topic(topicArray[j], QoS.EXACTLY_ONCE)}).await();
}
}
} catch (Exception e) {
getLogger().error(e.getMessage());
}
Expand All @@ -109,34 +131,37 @@ private FutureConnection createConnection(String host, String clientId, boolean
return client.futureConnection();

}
private FutureConnection createConnection(String host, String clientId, boolean durable,String user,String password)
throws URISyntaxException{

private FutureConnection createConnection(String host, String clientId, boolean durable,String user,String password) throws URISyntaxException
{
MQTT client = new MQTT();
client.setHost(host);
client.setClientId(clientId);
client.setUserName(user);
client.setPassword(password);
client.setCleanSession(!durable);
return client.futureConnection();

}
private void consume(JavaSamplerContext context) throws Exception{
consume(Integer.parseInt(context.getParameter("AGGREGATE")),Long.parseLong(context.getParameter("TIMEOUT")));
String topics= context.getParameter("TOPIC");
String[] topicArray = topics.split("\\s*,\\s*");
int size= topicArray.length;
consume(Integer.parseInt(context.getParameter("AGGREGATE")),Long.parseLong(context.getParameter("TIMEOUT")),size);
}

private void consume(int aggregate,long timeout) throws Exception{

private void consume(int aggregate,long timeout,int size) throws Exception{
for(int i = 1; i <= aggregate; ++i){
Message msg = connection.receive().await(timeout, TimeUnit.MILLISECONDS);
if(msg == null){
getLogger().error("MQTT consumer timed out while waiting for a message. The test has been aborted.");
return;
}
msg.ack();
System.out.println(new String(msg.getPayload()));
getLogger().debug("consumed " + i);
}
for(int j=0;j<size;j++){
Message msg = this.connectionArray[j].receive().await(timeout, TimeUnit.MILLISECONDS);
if(msg == null){
System.out.println("MQTT consumer timed out while waiting for a message. The test has been aborted.");
getLogger().error("MQTT consumer timed out while waiting for a message. The test has been aborted.");
return;
}
msg.ack();
System.out.println(new String(msg.getPayload()));
getLogger().debug(j+" "+"consumed "+ i);
}
}
}

public SampleResult runTest(JavaSamplerContext context) {
Expand Down Expand Up @@ -171,7 +196,25 @@ public SampleResult runTest(JavaSamplerContext context) {


public void close() {
if(this.connection!= null)
this.connection.disconnect();
if(this.connectionArray!=null){
for(int p=0;p<this.connectionArray.length;p++){
if (this.connectionArray[p] != null)
this.connectionArray[p].disconnect();
this.connectionArray[p]=null;
}
}
this.connectionArray= null;
}

private static final String mycharset = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
public static String getClientId(String clientPrefix, int suffixLength) {
Random rand = new Random(System.nanoTime()*System.currentTimeMillis());
StringBuilder sb = new StringBuilder();
sb.append(clientPrefix);
for (int i = 0; i < suffixLength; i++) {
int pos = rand.nextInt(mycharset.length());
sb.append(mycharset.charAt(pos));
}
return sb.toString();
}
}

0 comments on commit 5bd022c

Please sign in to comment.