An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Thu May 22 23:58:08 2014 +0000
Parent:
30:a4e3a97dabe3
Child:
32:3ad9afa63299
Child:
34:e18a166198df
Commit message:
Create MQTTSocket.h to not use EthernetInterface

Changed in this revision

MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
MQTTEthernet.h Show annotated file Show diff for this revision Revisions of this file
MQTTSocket.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.h	Tue May 20 15:07:11 2014 +0000
+++ b/MQTTClient.h	Thu May 22 23:58:08 2014 +0000
@@ -37,6 +37,7 @@
 
 enum QoS { QOS0, QOS1, QOS2 };
 
+// all failure return codes must be negative
 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
 
 
@@ -53,8 +54,13 @@
 
 struct MessageData
 {
-    struct Message message;
-    char* topicName;
+    MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
+    {
+
+    }
+    
+    struct Message &message;
+    MQTTString &topicName;
 };
 
 
@@ -75,6 +81,17 @@
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
+
+
+class QoS2
+{
+public:
+
+    
+private:
+
+
+};
   
   
 /**
@@ -86,12 +103,13 @@
  * @param Network a network class which supports send, receive
  * @param Timer a timer class with the methods: 
  */ 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
+class Client
 {
     
 public:
    
-    typedef void (*messageHandler)(Message*);
+    typedef void (*messageHandler)(MessageData&);
 
     /** Construct the client
      *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
@@ -136,7 +154,7 @@
      */   
     int unsubscribe(const char* topicFilter);
     
-    /** MQTT Disconnect - send an MQTT disconnect packet 
+    /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
      *  @return success code -  
      */
     int disconnect();
@@ -173,17 +191,24 @@
     
     PacketId packetid;
     
-    // typedef FP<void, Message*> messageHandlerFP;
     struct MessageHandlers
     {
         const char* topicFilter;
-        //messageHandlerFP fp; typedefs not liked?
-        FP<void, Message*> fp;
+        FP<void, MessageData&> fp;
     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
     
-    FP<void, Message*> defaultMessageHandler;
+    FP<void, MessageData&> defaultMessageHandler;
      
     bool isconnected;
+    
+#if 0
+    struct
+    {
+      bool used;
+      int id;  
+    } QoS2messages[MAX_QOS2_MESSAGES];
+    
+#endif
 
 };
 
@@ -335,7 +360,8 @@
         {
             if (messageHandlers[i].fp.attached())
             {
-                messageHandlers[i].fp(&message);
+                MessageData md(topicName, message);
+                messageHandlers[i].fp(md);
                 rc = SUCCESS;
             }
         }
@@ -343,7 +369,8 @@
     
     if (rc == FAILURE && defaultMessageHandler.attached()) 
     {
-        defaultMessageHandler(&message);
+        MessageData md(topicName, message);
+        defaultMessageHandler(md);
         rc = SUCCESS;
     }   
     
@@ -395,7 +422,15 @@
             if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
                                  (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 goto exit;
-            deliverMessage(topicName, msg);
+//          if (msg.qos != QOS2) 
+                deliverMessage(topicName, msg);
+#if 0
+            else if (isQoS2msgidFree(msg.id))
+            {
+                UseQoS2msgid(msg.id);
+                deliverMessage(topicName, msg);
+            }
+#endif
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
@@ -484,15 +519,18 @@
 {
     Timer connect_timer = Timer(command_timeout_ms);
     int rc = FAILURE;
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+    int len = 0;
+    
+    if (isconnected) // don't send connect packet again if we are already connected
+        goto exit;
 
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
         options = &default_options; // set default options if none were supplied
     
     this->keepAliveInterval = options->keepAliveInterval;
     ping_timer.countdown(this->keepAliveInterval);
-    int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
-    if (len <= 0)
+    if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
         goto exit;
     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
         goto exit; // there was a problem
@@ -519,11 +557,11 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 { 
-    int rc = FAILURE;
+    int rc = FAILURE;  
     Timer timer = Timer(command_timeout_ms);
     int len = 0;
+    MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    MQTTString topic = {(char*)topicFilter, 0, 0};
     if (!isconnected)
         goto exit;
     
@@ -556,8 +594,6 @@
         rc = FAILURE;
         
 exit:
-    //if (rc == FAILURE)
-    //   closesession();
     return rc;
 }
 
@@ -566,12 +602,14 @@
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
 {   
     int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
-    
+    Timer timer = Timer(command_timeout_ms);    
     MQTTString topic = {(char*)topicFilter, 0, 0};
+    int len = 0;
     
-    int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
-    if (len <= 0)
+    if (!isconnected)
+        goto exit;
+    
+    if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
         goto exit;
     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
@@ -595,14 +633,17 @@
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
 {
     int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);   
+    MQTTString topicString = {(char*)topicName, 0, 0};
+    int len = 0;
     
-    MQTTString topicString = {(char*)topicName, 0, 0};
+    if (!isconnected)
+        goto exit;
 
     if (message->qos == QOS1 || message->qos == QOS2)
         message->id = packetid.getNext();
     
-    int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
+    len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
               topicString, (char*)message->payload, message->payloadlen);
     if (len <= 0)
         goto exit;
@@ -645,7 +686,8 @@
     int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
     if (len > 0)
         rc = sendPacket(len, timer);            // send the disconnect packet
-
+        
+    isconnected = false;
     return rc;
 }
 
--- a/MQTTEthernet.h	Tue May 20 15:07:11 2014 +0000
+++ b/MQTTEthernet.h	Thu May 22 23:58:08 2014 +0000
@@ -4,46 +4,22 @@
 
 #include "MQTT_mbed.h"
 #include "EthernetInterface.h"
+#include "MQTTSocket.h"
 
-class MQTTEthernet
+class MQTTEthernet : public MQTTSocket
 {
 public:    
     MQTTEthernet()
     {
         eth.init();                          // Use DHCP
         eth.connect();
-        mysock.set_blocking(false, 1000);    // 1 second Timeout 
-    }
-    
-    int connect(char* hostname, int port)
-    {
-        return mysock.connect(hostname, port);
-    }
-
-    int read(char* buffer, int len, int timeout)
-    {
-        mysock.set_blocking(false, timeout);  
-        return mysock.receive(buffer, len);
-    }
-    
-    int write(char* buffer, int len, int timeout)
-    {
-        mysock.set_blocking(false, timeout);  
-        return mysock.send(buffer, len);
-    }
-    
-    int disconnect()
-    {
-        return mysock.close();
     }
     
 private:
 
     EthernetInterface eth;
-    TCPSocketConnection mysock; 
     
 };
 
 
-
 #endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTSocket.h	Thu May 22 23:58:08 2014 +0000
@@ -0,0 +1,41 @@
+#if !defined(MQTTSOCKET_H)
+#define MQTTSOCKET_H
+
+#include "MQTT_mbed.h"
+#include "TCPSocketConnection.h"
+
+class MQTTSocket
+{
+public:    
+    int connect(char* hostname, int port, int timeout=1000)
+    {
+        mysock.set_blocking(false, timeout);    // 1 second Timeout 
+        return mysock.connect(hostname, port);
+    }
+
+    int read(char* buffer, int len, int timeout)
+    {
+        mysock.set_blocking(false, timeout);  
+        return mysock.receive(buffer, len);
+    }
+    
+    int write(char* buffer, int len, int timeout)
+    {
+        mysock.set_blocking(false, timeout);  
+        return mysock.send(buffer, len);
+    }
+    
+    int disconnect()
+    {
+        return mysock.close();
+    }
+    
+private:
+
+    TCPSocketConnection mysock; 
+    
+};
+
+
+
+#endif