From 9bcd9ea6d1eed3d9ddfdccf696417dad4a50de13 Mon Sep 17 00:00:00 2001 From: Hemika Yasinda Kodikara Date: Mon, 14 Sep 2015 13:10:37 +0530 Subject: [PATCH] Fixed issues. --- .../protocol/mqtt/client/MqttPublisher.java | 8 +- .../protocol/mqtt/client/MqttSubscriber.java | 6 +- .../mqtt/paho/clients/AsyncClient.java | 11 +- .../mqtt/paho/clients/BlockingClient.java" | 174 ++++++++++++++++++ .../protocol/mqtt/utilities/Utils.java" | 44 +++++ 5 files changed, 235 insertions(+), 8 deletions(-) create mode 100644 "src/main/java/org/apache\260jmeter/protocol/mqtt/paho/clients/BlockingClient.java" create mode 100644 "src/main/java/org/apache\260jmeter/protocol/mqtt/utilities/Utils.java" diff --git a/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttPublisher.java b/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttPublisher.java index a32ed26..fe72031 100755 --- a/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttPublisher.java +++ b/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttPublisher.java @@ -67,7 +67,7 @@ public void setupTest(JavaSamplerContext context) { String brokerURL = context.getParameter("BROKER_URL"); String clientId = context.getParameter("CLIENT_ID"); topicName = context.getParameter("TOPIC_NAME"); - retained = Boolean.parseBoolean(context.getParameter("CLEAN_SESSION")); + retained = Boolean.parseBoolean(context.getParameter("MESSAGE_RETAINED")); String username = context.getParameter("USERNAME"); String password = context.getParameter("PASSWORD"); String client_type = context.getParameter("CLIENT_TYPE"); @@ -75,8 +75,10 @@ public void setupTest(JavaSamplerContext context) { // Generating client ID if empty if (StringUtils.isEmpty(clientId)){ - clientId = System.currentTimeMillis() + "." + System.getProperty("user.name"); - clientId = clientId.substring(0, 23); + clientId = System.nanoTime() + "." + System.getProperty("user.name"); + if (clientId.length() > 23) { + clientId = clientId.substring(0, 23); + } } // Quality diff --git a/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttSubscriber.java b/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttSubscriber.java index 8c66ffe..6ce2c96 100755 --- a/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttSubscriber.java +++ b/src/main/java/org/apache/jmeter/protocol/mqtt/client/MqttSubscriber.java @@ -96,8 +96,10 @@ private void setupTest(String brokerURL, String clientId, String topic, // Generating client ID if empty if (StringUtils.isEmpty(clientId)){ - clientId = System.currentTimeMillis() + "." + System.getProperty("user.name"); - clientId = clientId.substring(0, 23); + clientId = System.nanoTime() + "." + System.getProperty("user.name"); + if (clientId.length() > 23) { + clientId = clientId.substring(0, 23); + } } // Quality diff --git a/src/main/java/org/apache/jmeter/protocol/mqtt/paho/clients/AsyncClient.java b/src/main/java/org/apache/jmeter/protocol/mqtt/paho/clients/AsyncClient.java index e895223..c964a36 100644 --- a/src/main/java/org/apache/jmeter/protocol/mqtt/paho/clients/AsyncClient.java +++ b/src/main/java/org/apache/jmeter/protocol/mqtt/paho/clients/AsyncClient.java @@ -67,9 +67,14 @@ public AsyncClient(String brokerUrl, String clientId, boolean cleanSession, // stored until the message has been delivered to the server. //..a real application ought to store them somewhere // where they are not likely to get deleted or tampered with - String testPlanFile = GuiPackage.getInstance().getTestPlanFile(); - String testPlanFileDir = FilenameUtils.getFullPathNoEndSeparator(testPlanFile); - testPlanFileDir = testPlanFileDir + File.separator + "tmp" + File.separator + clientId + File.separator + Thread.currentThread().getId(); + String testPlanFileDir = System.getProperty("java.io.tmpdir"); + + if (null != GuiPackage.getInstance()) { + String testPlanFile = GuiPackage.getInstance().getTestPlanFile(); + testPlanFileDir = FilenameUtils.getFullPathNoEndSeparator(testPlanFile); + testPlanFileDir = testPlanFileDir + File.separator + "tmp" + File.separator + clientId + File.separator + Thread.currentThread().getId(); + } + MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(testPlanFileDir); try { diff --git "a/src/main/java/org/apache\260jmeter/protocol/mqtt/paho/clients/BlockingClient.java" "b/src/main/java/org/apache\260jmeter/protocol/mqtt/paho/clients/BlockingClient.java" new file mode 100644 index 0000000..75800b8 --- /dev/null +++ "b/src/main/java/org/apache\260jmeter/protocol/mqtt/paho/clients/BlockingClient.java" @@ -0,0 +1,174 @@ +/** + * Author : Hemika Yasinda Kodikara + * + * Copyright (c) 2015. + */ + +package org.apache.jmeter.protocol.mqtt.paho.clients; + +import org.apache.commons.io.FilenameUtils; +import org.apache.jmeter.gui.GuiPackage; +import org.apache.jmeter.protocol.mqtt.data.objects.Message; +import org.apache.jorphan.logging.LoggingManager; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; + +import java.io.File; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A sample application that demonstrates how to use the Paho MQTT v3.1 Client blocking API. + *

+ * It can be run from the command line in one of two modes: + * - as a publisher, sending a single message to a topic on the server + * - as a subscriber, listening for messages from the server + *

+ * There are three versions of the sample that implement the same features + * but do so using using different programming styles: + *

    + *
  1. Sample (this one) which uses the API which blocks until the operation completes
  2. + *
  3. SampleAsyncWait shows how to use the asynchronous API with waiters that block until + * an action completes
  4. + *
  5. SampleAsyncCallBack shows how to use the asynchronous API where events are + * used to notify the application when an action completes
  6. + *
+ *

+ * If the application is run with the -h parameter then info is displayed that + * describes all of the options / parameters. + */ +public class BlockingClient extends BaseClient { + + private static final org.apache.log.Logger log = LoggingManager.getLoggerForClass(); + private MqttClient client; + private String brokerUrl; + + /** + * Constructs an instance of the sample client wrapper + * + * @param brokerUrl the url of the server to connect to + * @param clientId the client id to connect with + * @param cleanSession clear state at end of connection or not (durable or non-durable subscriptions) + * @param userName the username to connect with + * @param password the password for the user + * @throws MqttException + */ + public BlockingClient(String brokerUrl, String clientId, boolean cleanSession, String userName, + String password) throws MqttException { + this.brokerUrl = brokerUrl; + //This sample stores in a temporary directory... where messages temporarily + // stored until the message has been delivered to the server. + //..a real application ought to store them somewhere + // where they are not likely to get deleted or tampered with + String testPlanFileDir = System.getProperty("java.io.tmpdir"); + + if (null != GuiPackage.getInstance()) { + String testPlanFile = GuiPackage.getInstance().getTestPlanFile(); + testPlanFileDir = FilenameUtils.getFullPathNoEndSeparator(testPlanFile); + testPlanFileDir = testPlanFileDir + File.separator + "tmp" + File.separator + clientId + File.separator + Thread.currentThread().getId(); + } + + MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(testPlanFileDir); + + // Construct the connection options object that contains connection parameters + // such as cleanSession and LWT + MqttConnectOptions conOpt = new MqttConnectOptions(); + conOpt.setCleanSession(cleanSession); + if (password != null) { + conOpt.setPassword(password.toCharArray()); + } + if (userName != null) { + conOpt.setUserName(userName); + } + + // Construct an MQTT blocking mode client + client = new MqttClient(this.brokerUrl, clientId, dataStore); + + // Set this wrapper as the callback handler + client.setCallback(this); + + // Connect to the MQTT server + log.info("Connecting to " + brokerUrl + " with client ID '" + client.getClientId() + "' as a blocking client"); + client.connect(conOpt); + log.info("Connected"); + } + + /** + * {@inheritDoc} + */ + @Override + public void disconnect() throws MqttException { + // Disconnect the client + client.disconnect(); + log.info("Disconnected"); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isConnected() { + return client.isConnected(); + } + + /** + * {@inheritDoc} + */ + @Override + public void publish(String topicName, int qos, byte[] payload, boolean isRetained) throws MqttException { + // Create and configure a message + MqttMessage message = new MqttMessage(payload); + message.setRetained(isRetained); + message.setQos(qos); + + // Send the message to the server, control is not returned until + // it has been delivered to the server meeting the specified + // quality of service. + client.publish(topicName, message); + } + + /** + * {@inheritDoc} + */ + @Override + public void subscribe(String topicName, int qos) throws MqttException { + mqttMessageStorage = new ConcurrentLinkedQueue(); + receivedMessageCounter = new AtomicLong(0); + + // Subscribe to the requested topic + // The QoS specified is the maximum level that messages will be sent to the client at. + // For instance if QoS 1 is specified, any messages originally published at QoS 2 will + // be downgraded to 1 when delivering to the client but messages published at 1 and 0 + // will be received at the same level they were published at. + log.info("Subscribing to topic \"" + topicName + "\" qos " + qos); + client.subscribe(topicName, qos); + } + + /** + * {@inheritDoc} + */ + @Override + public void connectionLost(Throwable cause) { + log.info("Connection to " + brokerUrl + " lost!" + cause); + } + + /** + * {@inheritDoc} + */ + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + /** + * {@inheritDoc} + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws MqttException { + Message newMessage = new Message(message); + mqttMessageStorage.add(newMessage); + } +} \ No newline at end of file diff --git "a/src/main/java/org/apache\260jmeter/protocol/mqtt/utilities/Utils.java" "b/src/main/java/org/apache\260jmeter/protocol/mqtt/utilities/Utils.java" new file mode 100644 index 0000000..796ca52 --- /dev/null +++ "b/src/main/java/org/apache\260jmeter/protocol/mqtt/utilities/Utils.java" @@ -0,0 +1,44 @@ +/** + * Author : Hemika Yasinda Kodikara + * + * Copyright (c) 2015. + */ + +package org.apache.jmeter.protocol.mqtt.utilities; + +import org.apache.jorphan.io.TextFile; + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; + +/** + * Utility class for plugin + */ +public class Utils { + + /** + * Creates a UUID. The UUID is modified to avoid "ClientId longer than 23 characters" for MQTT. + * + * @return A UUID as a string. + * @throws NoSuchAlgorithmException + */ + public static String UUIDGenerator() throws NoSuchAlgorithmException { + String clientId = System.currentTimeMillis() + "." + System.getProperty("user.name"); + if (clientId.length() > 23) { + clientId = clientId.substring(0, 23); + } + return clientId; + } + + /** + * The implementation uses TextFile to load the contents of the file and + * returns a string. + * + * @param path path to the file to read in + * @return the contents of the file + */ + public static String getFileContent(String path) { + TextFile tf = new TextFile(path); + return tf.getText(); + } +}