Skip to content

Commit

Permalink
Code cleaned up. Generalized blocking client and async client.
Browse files Browse the repository at this point in the history
  • Loading branch information
hemikak committed Jun 7, 2015
1 parent a14cbf7 commit 02695ca
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 1,643 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@

# Maven
log/
target/
target/

# Project
dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.protocol.mqtt.control.gui.MQTTPublisherGui;
import org.apache.jmeter.protocol.mqtt.paho.clients.BaseClient;
import org.apache.jmeter.protocol.mqtt.paho.clients.SimpleAsyncWaitClient;
import org.apache.jmeter.protocol.mqtt.paho.clients.SimpleClient;
import org.apache.jmeter.protocol.mqtt.paho.clients.AsyncClient;
import org.apache.jmeter.protocol.mqtt.paho.clients.BlockingClient;
import org.apache.jmeter.samplers.SampleResult;
import org.eclipse.paho.client.mqttv3.MqttException;

Expand Down Expand Up @@ -66,11 +66,11 @@ public void setupTest(JavaSamplerContext context) {

// Quality
if (MQTTPublisherGui.EXACTLY_ONCE.equals(context.getParameter("QOS"))) {
qos = 0;
qos = 2;
} else if (MQTTPublisherGui.AT_LEAST_ONCE.equals(context.getParameter("QOS"))) {
qos = 1;
} else if (MQTTPublisherGui.AT_MOST_ONCE.equals(context.getParameter("QOS"))) {
qos = 2;
qos = 0;
}
topic = context.getParameter("TOPIC");
publishMessage = context.getParameter("MESSAGE");
Expand All @@ -79,10 +79,10 @@ public void setupTest(JavaSamplerContext context) {
setupTest(host, clientId, context.getParameter("USER"), context.getParameter("PASSWORD"));
}

public void setupTest(String host, String clientId, String userName, String password) {
public void setupTest(String brokerURL, String clientId, String userName, String password) {
try {
client = new SimpleClient(host, clientId, false, false, userName, password);
} catch (Exception e) {
client = new BlockingClient(brokerURL, clientId, false, userName, password);
} catch (MqttException e) {
getLogger().error(e.getMessage(), e);
}
}
Expand All @@ -92,7 +92,7 @@ public SampleResult runTest(JavaSamplerContext context) {
try {
String host = context.getParameter("HOST");
String clientId = context.getParameter("CLIENT_ID");
client = new SimpleAsyncWaitClient(host, clientId, false, false, context.getParameter("USER"),
client = new AsyncClient(host, clientId, false, context.getParameter("USER"),
context.getParameter("PASSWORD"));
} catch (MqttException e) {
e.printStackTrace();
Expand All @@ -109,7 +109,6 @@ public SampleResult runTest(JavaSamplerContext context) {
publishedMessageCount.incrementAndGet();
return result;
} catch (MqttException e) {
e.printStackTrace();
result.sampleEnd(); // stop stopwatch
result.setSuccessful(false);
result.setResponseMessage("Exception: " + e.toString());
Expand Down
184 changes: 34 additions & 150 deletions src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,22 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.protocol.mqtt.control.gui.MQTTSubscriberGui;
import org.apache.jmeter.protocol.mqtt.sampler.SubscriberSampler;
import org.apache.jmeter.protocol.mqtt.data.objects.Message;
import org.apache.jmeter.protocol.mqtt.paho.clients.BaseClient;
import org.apache.jmeter.protocol.mqtt.paho.clients.BlockingClient;
import org.apache.jmeter.samplers.SampleResult;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class MqttSubscriber extends AbstractJavaSamplerClient implements
Serializable {
private static ConcurrentLinkedQueue<byte[]> mqttMessageStorage = new ConcurrentLinkedQueue<byte[]>();
private static AtomicLong receivedMessageCount;
private static MqttAsyncClient client;
private String topic;

private static BaseClient client;
private static final long serialVersionUID = 1L;
private boolean interrupted;
private String lineSeparator = System.getProperty("line.separator");

@Override
public Arguments getDefaultParameters() {
Expand All @@ -76,41 +65,22 @@ public void setupTest(JavaSamplerContext context) {

}

private void setupTest(String host, String clientId, String topic,
String user, String password, boolean cleanSession,
private void setupTest(String brokerURL, String clientId, String topic,
String userName, String password, boolean cleanSession,
String quality) {
try {
// Quality
int qos = 0;
if (MQTTSubscriberGui.EXACTLY_ONCE.equals(quality)) {
qos = 0;
qos = 2;
} else if (MQTTSubscriberGui.AT_LEAST_ONCE.equals(quality)) {
qos = 1;
} else if (MQTTSubscriberGui.AT_MOST_ONCE.equals(quality)) {
qos = 2;
}

this.topic = topic;

String generateClientId = MqttAsyncClient.generateClientId();
if(generateClientId.length() > 20){
generateClientId = generateClientId.substring(0, 20);
qos = 0;
}

client = new MqttAsyncClient(host, generateClientId, new MemoryPersistence());
JMeterIMqttSubscriberActionListener actionListener = new JMeterIMqttSubscriberActionListener(topic, qos, client);

MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setUserName(user);
connectOptions.setPassword(password.toCharArray());
connectOptions.setCleanSession(cleanSession);

client.connect(connectOptions, null, actionListener);

receivedMessageCount = new AtomicLong(0);

JMeterMqttSubscriberCallback callback = new JMeterMqttSubscriberCallback();
client.setCallback(callback);
client = new BlockingClient(brokerURL, clientId, cleanSession, userName, password);
client.subscribe(topic, qos);
} catch (MqttSecurityException e) {
getLogger().error("Security related error occurred", e);
} catch (MqttException e) {
Expand All @@ -122,129 +92,43 @@ public SampleResult runTest(JavaSamplerContext context) {
SampleResult result = new SampleResult();
result.sampleStart();

byte[] messagePayload = null;
while(!interrupted && null != mqttMessageStorage && null != receivedMessageCount){
messagePayload = mqttMessageStorage.poll();
receivedMessageCount.incrementAndGet();
if(messagePayload != null){
break;
Message receivedMessage;
while (!interrupted && null != client.getReceivedMessages() && null != client.getReceivedMessageCounter()) {
receivedMessage = client.getReceivedMessages().poll();
client.getReceivedMessageCounter().incrementAndGet();
if (receivedMessage != null) {
result.sampleEnd();
result.setSuccessful(true);
result.setResponseMessage("Received " + client.getReceivedMessageCounter().get() + " messages." +
lineSeparator + "Current message QOS : " + receivedMessage.getQos() +
lineSeparator + "Is current message a duplicate : " + receivedMessage.isDup()
+ lineSeparator + "Received timestamp of current message : " +
receivedMessage.getCurrentTimestamp() + lineSeparator + "Is current message" +
" a retained message : " + receivedMessage.isRetained());
result.setBytes(receivedMessage.getPayload().length);
result.setResponseData(receivedMessage.getPayload());
result.setResponseCodeOK();
return result;
}
}

result.setSuccessful(true);
result.setResponseMessage("Received " + receivedMessageCount.get() + " messages(may be incorrect)");

result.setResponseData(messagePayload);
result.sampleEnd(); // stop stopwatch
result.setResponseCode("OK");
result.setSuccessful(false);
result.setResponseMessage("Error occurred while receiving messages. Received " + client
.getReceivedMessageCounter().get() + " valid messages.");
result.sampleEnd();
result.setResponseCode("FAILED");
return result;

// try {
// while (true) {
// if(client.isConnected() && null != mqttMessageStorage && null != receivedMessageCount){
// MqttMessage message = mqttMessageStorage.poll();
// if (null != message) {
// if (null != context.getParameter("TIMEOUT") && !context.getParameter("TIMEOUT").equals("")) {
// Thread.sleep(Long.parseLong(context.getParameter("TIMEOUT")));
// }
// result.setSuccessful(true);
// result.setResponseMessage("Received " + receivedMessageCount.get() + " messages(may be incorrect)");
//
// result.setResponseData(message.getPayload());
// result.sampleEnd(); // stop stopwatch
// result.setResponseCode("OK");
// return result;
// }
// }else if(interrupted){
// result.setSuccessful(false);
// result.setResponseMessage("Client is being stopped");
// result.sampleEnd(); // stop stopwatch
// result.setResponseCode("OK");
// return result;
// }
// }
// } catch (InterruptedException e) {
// result.sampleEnd(); // stop stopwatch
// result.setSuccessful(false);
// result.setResponseMessage("Exception: " + e);
// // get stack trace as a String to return as document data
// java.io.StringWriter stringWriter = new java.io.StringWriter();
// e.printStackTrace(new java.io.PrintWriter(stringWriter));
// result.setResponseData(stringWriter.toString(), null);
// result.setDataType(org.apache.jmeter.samplers.SampleResult.TEXT);
// result.setResponseCode("FAILED");
// return result;
// }
}

public void close(boolean interrupted) {
try {
this.interrupted = interrupted;
if (null != client && client.isConnected()) {
client.unsubscribe(this.topic);
client.disconnect();
getLogger().info("Client disconnected");
client.close();
getLogger().info("Client closed");
}
} catch (MqttException e) {
getLogger().error("Error when closing subscriber", e);
}
}

private class JMeterIMqttSubscriberActionListener implements IMqttActionListener{

private String topic;
private int qos;
private MqttAsyncClient asyncClient;

public JMeterIMqttSubscriberActionListener(String topic, int qos, MqttAsyncClient asyncClient) {
this.topic = topic;
this.qos = qos;
this.asyncClient = asyncClient;
}

@Override
public void onSuccess(IMqttToken iMqttToken) {
try {
asyncClient.subscribe(topic, qos, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
getLogger().info("Successfully subscribed");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
getLogger().info("Subscribing failed");
}
});

} catch (MqttException e) {
getLogger().error("Unable to subscribe", e);
}
}

@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
getLogger().error("Unable to subscribe", throwable);
}
}

private class JMeterMqttSubscriberCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
getLogger().error("Connection lost on callback", throwable);
}

@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
if (null != mqttMessage) {
mqttMessageStorage.add(mqttMessage.getPayload());
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.jmeter.protocol.mqtt.data.objects;

import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
*
*/
public class Message {
private byte[] payload;
private int qos = 0;
private boolean retained = false;
private boolean dup = false;
private long currentTimestamp;

public Message(byte[] payload, int qos, boolean retained, boolean dup, long currentTimestamp) {
this.payload = payload;
this.qos = qos;
this.retained = retained;
this.dup = dup;
this.currentTimestamp = currentTimestamp;
}

public Message(MqttMessage mqttMessage) {
this.payload = mqttMessage.getPayload();
this.qos = mqttMessage.getQos();
this.retained = mqttMessage.isRetained();
this.dup = mqttMessage.isDuplicate();
this.currentTimestamp = System.currentTimeMillis();
}

public byte[] getPayload() {
return payload;
}

public int getQos() {
return qos;
}

public boolean isRetained() {
return retained;
}

public void setRetained(boolean retained) {
this.retained = retained;
}

public boolean isDup() {
return dup;
}

public long getCurrentTimestamp() {
return currentTimestamp;
}
}
Loading

0 comments on commit 02695ca

Please sign in to comment.