An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Sun May 11 18:19:07 2014 +0000
Parent:
23:05fc7de97d4a
Child:
27:8e27b74cfdc9
Commit message:
Wildcard subscription support. Limits as template parameters.

Changed in this revision

MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
MQTTPacket.lib Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.h	Tue May 06 09:44:23 2014 +0000
+++ b/MQTTClient.h	Sun May 11 18:19:07 2014 +0000
@@ -18,13 +18,7 @@
  
  TODO: 
  
- log messages - use macros
- 
- define return code constants
- 
- call connectionLost at appropriate points - in sendPacket and readPacket
- 
- match wildcard topics
+ ensure publish packets are retried on reconnect
  
  */
 
@@ -94,14 +88,7 @@
 {
     
 public:
-
-    typedef struct
-    {
-        Client* client;
-        Network* network;
-    } connectionLostInfo;
-    
-    typedef int (*connectionLostHandlers)(connectionLostInfo*);
+   
     typedef void (*messageHandler)(Message*);
 
     /** Construct the client
@@ -111,14 +98,6 @@
      */
     Client(Network& network, unsigned int command_timeout_ms = 30000); 
     
-    /** Set the connection lost callback - called whenever the connection is lost and we should be connected
-     *  @param clh - pointer to the callback function
-     */
-    void setConnectionLostHandler(connectionLostHandlers clh)
-    {
-        connectionLostHandler.attach(clh);
-    }
-    
     /** Set the default message handling callback - used for any message which does not match a subscription message handler
      *  @param mh - pointer to the callback function
      */
@@ -164,8 +143,9 @@
      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be 
      *  received.
      *  @param timeout_ms the time to wait, in milliseconds
+     *  @return success code - on failure, this means the client has disconnected
      */
-    void yield(int timeout_ms = 1000);
+    int yield(int timeout_ms = 1000);
     
 private:
 
@@ -176,7 +156,8 @@
     int decodePacket(int* value, int timeout);
     int readPacket(Timer& timer);
     int sendPacket(int length, Timer& timer);
-    int deliverMessage(MQTTString* topic, Message* message);
+    int deliverMessage(MQTTString& topicName, Message& message);
+    bool isTopicMatched(char* topicFilter, MQTTString& topicName);
     
     Network& ipstack;
     unsigned int command_timeout_ms;
@@ -193,15 +174,13 @@
     typedef FP<void, Message*> messageHandlerFP;
     struct MessageHandlers
     {
-        const char* topic;
+        const char* topicFilter;
         messageHandlerFP fp;
     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
     
     messageHandlerFP defaultMessageHandler;
     
-    typedef FP<int, connectionLostInfo*> connectionLostFP;
-    connectionLostFP connectionLostHandler;
-    
+    bool isconnected;
 };
 
 }
@@ -213,8 +192,9 @@
     ping_timer = Timer();
     ping_outstanding = 0;
     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
-        messageHandlers[i].topic = 0;
-    this->command_timeout_ms = command_timeout_ms;    
+        messageHandlers[i].topicFilter = 0;
+    this->command_timeout_ms = command_timeout_ms; 
+    isconnected = false;
 }
 
 
@@ -227,13 +207,9 @@
     while (sent < length && !timer.expired())
     {
         rc = ipstack.write(&buf[sent], length, timer.left_ms());
-        if (rc == -1)
-        {
-            connectionLostInfo info = {this, &ipstack};
-            connectionLostHandler(&info);
-        }
-        else
-            sent += rc;
+        if (rc < 0)  // there was an error writing the data
+            break;
+        sent += rc;
     }
     if (sent == length)
     {
@@ -246,7 +222,8 @@
 }
 
 
-template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -283,7 +260,7 @@
 template<class Network, class Timer, int a, int b> 
 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 
 {
-    int rc = -1;
+    int rc = FAILURE;
     MQTTHeader header = {0};
     int len = 0;
     int rem_len = 0;
@@ -308,23 +285,63 @@
 }
 
 
+// assume topic filter and name is in correct format
+// # can only be at end
+// + and # can only be next to separator
+template<class Network, class Timer, int a, int b> 
+bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
+{
+    char* curf = topicFilter;
+    char* curn = topicName.lenstring.data;
+    char* curn_end = curn + topicName.lenstring.len;
+    
+    while (*curf && curn < curn_end)
+    {
+        if (*curn == '/' && *curf != '/')
+            break;
+        if (*curf != '+' && *curf != '#' && *curf != *curn)
+            break;
+        if (*curf == '+')
+        {   // skip until we meet the next separator, or end of string
+            char* nextpos = curn + 1;
+            while (nextpos < curn_end && *nextpos != '/')
+                nextpos = ++curn + 1;
+        }
+        else if (*curf == '#')
+            curn = curn_end - 1;    // skip until end of string
+        curf++;
+        curn++;
+    };
+    
+    return (curn == curn_end) && (*curf == '\0');
+}
+
+
+
 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
-int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString* topic, Message* message)
+int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
 {
-    int rc = -1;
+    int rc = FAILURE;
 
     // we have to find the right message handler - indexed by topic
     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
     {
-        if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
+        if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
+                isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
         {
-            messageHandlers[i].fp(message);
-            rc = 0;
-            break;
+            if (messageHandlers[i].fp.attached())
+            {
+                messageHandlers[i].fp(&message);
+                rc = SUCCESS;
+            }
         }
     }
-    if (rc == -1)
-        defaultMessageHandler(message);
+    
+    if (rc == FAILURE && defaultMessageHandler.attached()) 
+    {
+        defaultMessageHandler(&message);
+        rc = SUCCESS;
+    }   
     
     return rc;
 }
@@ -332,13 +349,22 @@
 
 
 template<class Network, class Timer, int a, int b> 
-void MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
+int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
 {
+    int rc = SUCCESS;
     Timer timer = Timer();
     
     timer.countdown_ms(timeout_ms);
     while (!timer.expired())
-        cycle(timer);
+    {
+        if (cycle(timer) == FAILURE)
+        {
+            rc = FAILURE;
+            break;
+        }
+    }
+        
+    return rc;
 }
 
 
@@ -350,7 +376,8 @@
     // read the socket, see what work is due
     int packet_type = readPacket(timer);
     
-    int len, rc;
+    int len = 0,
+        rc = SUCCESS;
     switch (packet_type)
     {
         case CONNACK:
@@ -360,27 +387,34 @@
         case PUBLISH:
             MQTTString topicName;
             Message msg;
-            rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE);;
-            deliverMessage(&topicName, &msg);
+            if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+                goto exit;
+            deliverMessage(topicName, msg);
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
                     len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
                 else if (msg.qos == QOS2)
                     len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
-                if ((rc = sendPacket(len, timer)) != SUCCESS)
+                if (len <= 0)
+                    rc = FAILURE;
+                else
+                    rc = sendPacket(len, timer);
+                if (rc == FAILURE)
                     goto exit; // there was a problem
             }
             break;
         case PUBREC:
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-                ; 
-            len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
-            if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+                rc = FAILURE;
+            else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
+                rc = FAILURE;
+            else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
+                rc = FAILURE; // there was a problem
+            if (rc == FAILURE)
                 goto exit; // there was a problem
-
             break;
         case PUBCOMP:
             break;
@@ -390,30 +424,30 @@
     }
     keepalive();
 exit:
-    return packet_type;
+    if (rc == SUCCESS)
+        rc = packet_type;
+    return rc;
 }
 
 
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
 {
-    int rc = 0;
+    int rc = FAILURE;
 
     if (keepAliveInterval == 0)
+    {
+        rc = SUCCESS;
         goto exit;
+    }
 
     if (ping_timer.expired())
     {
-        if (ping_outstanding)
-            rc = -1;
-        else
+        if (!ping_outstanding)
         {
             Timer timer = Timer(1000);
             int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
-            rc = sendPacket(len, timer); // send the ping packet
-            if (rc != SUCCESS) 
-                rc = -1; // indicate there's a problem
-            else
+            if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
                 ping_outstanding = true;
         }
     }
@@ -427,7 +461,7 @@
 template<class Network, class Timer, int a, int b> 
 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
 {
-    int rc = -1;
+    int rc = FAILURE;
     
     do
     {
@@ -444,6 +478,7 @@
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
 {
     Timer connect_timer = Timer(command_timeout_ms);
+    int rc = FAILURE;
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
@@ -452,8 +487,9 @@
     this->keepAliveInterval = options->keepAliveInterval;
     ping_timer.countdown(this->keepAliveInterval);
     int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
-    int rc = sendPacket(len, connect_timer); // send the connect packet
-    if (rc != SUCCESS) 
+    if (len <= 0)
+        goto exit;
+    if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
         goto exit; // there was a problem
     
     // this will be a blocking call, wait for the connack
@@ -462,9 +498,15 @@
         int connack_rc = -1;
         if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = connack_rc;
+        else
+            rc = FAILURE;
     }
+    else
+        rc = FAILURE;
     
 exit:
+    if (rc == SUCCESS)
+        isconnected = true;
     return rc;
 }
 
@@ -472,17 +514,19 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 { 
-    int len = -1;
+    int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);
+    int len = 0;
     
     MQTTString topic = {(char*)topicFilter, 0, 0};
-    
-    int rc = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    if (rc <= 0)
+    if (!isconnected)
         goto exit;
-    len = rc;
+    
+    len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    if (len <= 0)
+        goto exit;
     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
-        goto exit; // there was a problem
+        goto exit;             // there was a problem
     
     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback 
     {
@@ -493,9 +537,9 @@
         {
             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
             {
-                if (messageHandlers[i].topic == 0)
+                if (messageHandlers[i].topicFilter == 0)
                 {
-                    messageHandlers[i].topic = topicFilter;
+                    messageHandlers[i].topicFilter = topicFilter;
                     messageHandlers[i].fp.attach(messageHandler);
                     rc = 0;
                     break;
@@ -503,8 +547,12 @@
             }
         }
     }
-    
+    else 
+        rc = FAILURE;
+        
 exit:
+    //if (rc == FAILURE)
+    //   closesession();
     return rc;
 }
 
@@ -512,15 +560,14 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
 {   
-    int len = -1;
+    int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);
     
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    int rc = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
-    if (rc <= 0)
+    int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
+    if (len <= 0)
         goto exit;
-    len = rc;
     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
     
@@ -530,6 +577,8 @@
         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = 0; 
     }
+    else
+        rc = FAILURE;
     
 exit:
     return rc;
@@ -540,6 +589,7 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
 {
+    int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);
     
     MQTTString topicString = {(char*)topicName, 0, 0};
@@ -549,8 +599,9 @@
     
     int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
               topicString, (char*)message->payload, message->payloadlen);
-    int rc = sendPacket(len, timer); // send the subscribe packet
-    if (rc != SUCCESS) 
+    if (len <= 0)
+        goto exit;
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
     
     if (message->qos == QOS1)
@@ -558,18 +609,22 @@
         if (waitfor(PUBACK, timer) == PUBACK)
         {
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-                rc = 0; 
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+                rc = FAILURE;
         }
+        else
+            rc = FAILURE;
     }
     else if (message->qos == QOS2)
     {
         if (waitfor(PUBCOMP, timer) == PUBCOMP)
         {
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-                rc = 0; 
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+                rc = FAILURE;
         }
+        else
+            rc = FAILURE;
     }
     
 exit:
@@ -580,10 +635,12 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
 {  
+    int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
     int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
-    int rc = sendPacket(len, timer);   // send the disconnect packet
-    
+    if (len > 0)
+        rc = sendPacket(len, timer);            // send the disconnect packet
+
     return rc;
 }
 
--- a/MQTTPacket.lib	Tue May 06 09:44:23 2014 +0000
+++ b/MQTTPacket.lib	Sun May 11 18:19:07 2014 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/teams/mqtt/code/MQTTPacket/#1b8fb13fc6ef
+http://mbed.org/teams/mqtt/code/MQTTPacket/#b97b9873af52