An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Mon Sep 25 12:06:28 2017 +0000
Parent:
52:3f9919941b86
Child:
54:ff9e5c4b52d0
Commit message:
Latest code update from Paho - 1.1

Changed in this revision

MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
MQTTPacket.lib Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.h	Mon Sep 25 11:12:23 2017 +0000
+++ b/MQTTClient.h	Mon Sep 25 12:06:28 2017 +0000
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2014, 2015 IBM Corp.
+ * Copyright (c) 2014, 2017 IBM Corp.
  *
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -16,6 +16,9 @@
  *    Ian Craggs - fix for bug 460389 - send loop uses wrong length
  *    Ian Craggs - fix for bug 464169 - clearing subscriptions
  *    Ian Craggs - fix for bug 464551 - enums and ints can be different size
+ *    Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
+ *    Ian Craggs - fix for bug 475749 - packetid modified twice
+ *    Ian Craggs - add ability to set message handler separately #6
  *******************************************************************************/
 
 #if !defined(MQTTCLIENT_H)
@@ -23,7 +26,7 @@
 
 #include "FP.h"
 #include "MQTTPacket.h"
-#include "stdio.h"
+#include <stdio.h>
 #include "MQTTLogging.h"
 
 #if !defined(MQTTCLIENT_QOS1)
@@ -64,6 +67,19 @@
 };
 
 
+struct connackData
+{
+    int rc;
+    bool sessionPresent;
+};
+
+
+struct subackData
+{
+    int grantedQoS;
+};
+
+
 class PacketId
 {
 public:
@@ -74,7 +90,7 @@
 
     int getNext()
     {
-        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+        return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
     }
 
 private:
@@ -108,34 +124,51 @@
     Client(Network& network, unsigned int command_timeout_ms = 30000);
 
     /** Set the default message handling callback - used for any message which does not match a subscription message handler
-     *  @param mh - pointer to the callback function
+     *  @param mh - pointer to the callback function.  Set to 0 to remove.
      */
     void setDefaultMessageHandler(messageHandler mh)
     {
-        defaultMessageHandler.attach(mh);
+        if (mh != 0)
+            defaultMessageHandler.attach(mh);
+        else
+            defaultMessageHandler.detach();
     }
 
+    /** Set a message handling callback.  This can be used outside of the the subscribe method.
+     *  @param topicFilter - a topic pattern which can include wildcards
+     *  @param mh - pointer to the callback function. If 0, removes the callback if any
+     */
+    int setMessageHandler(const char* topicFilter, messageHandler mh);
+
     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
      *  The nework object must be connected to the network endpoint before calling this
      *  Default connect options are used
      *  @return success code -
      */
     int connect();
-    
-        /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+
+    /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
      *  The nework object must be connected to the network endpoint before calling this
      *  @param options - connect options
      *  @return success code -
      */
     int connect(MQTTPacket_connectData& options);
 
+    /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+     *  The nework object must be connected to the network endpoint before calling this
+     *  @param options - connect options
+     *  @param connackData - connack data to be returned
+     *  @return success code -
+     */
+    int connect(MQTTPacket_connectData& options, connackData& data);
+
     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
      *  @param topic - the topic to publish to
      *  @param message - the message to send
      *  @return success code -
      */
     int publish(const char* topicName, Message& message);
-    
+
     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
      *  @param topic - the topic to publish to
      *  @param payload - the data to send
@@ -145,12 +178,12 @@
      *  @return success code -
      */
     int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
-    
+
     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
      *  @param topic - the topic to publish to
      *  @param payload - the data to send
      *  @param payloadlen - the length of the data
-     *  @param id - the packet id used - returned 
+     *  @param id - the packet id used - returned
      *  @param qos - the QoS to send the publish at
      *  @param retained - whether the message should be retained
      *  @return success code -
@@ -165,6 +198,15 @@
      */
     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
 
+    /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
+     *  @param topicFilter - a topic pattern which can include wildcards
+     *  @param qos - the MQTT QoS to subscribe at©
+     *  @param mh - the callback function to be invoked when a message is received for this subscription
+     *  @param
+     *  @return success code -
+     */
+    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
+
     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
      *  @param topicFilter - a topic pattern which can include wildcards
      *  @return success code -
@@ -194,6 +236,7 @@
 
 private:
 
+    void closeSession();
     void cleanSession();
     int cycle(Timer& timer);
     int waitfor(int packet_type, Timer& timer);
@@ -253,12 +296,10 @@
 
 
 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
-void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() 
+void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
 {
-    ping_outstanding = false;
     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
         messageHandlers[i].topicFilter = 0;
-    isconnected = false;
 
 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
     inflightMsgid = 0;
@@ -274,12 +315,21 @@
 
 
 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
+void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
+{
+    ping_outstanding = false;
+    isconnected = false;
+    if (cleansession)
+        cleanSession();
+}
+
+
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
 {
-    last_sent = Timer();
-    last_received = Timer();
     this->command_timeout_ms = command_timeout_ms;
-    cleanSession();
+    cleansession = true;
+      closeSession();
 }
 
 
@@ -347,7 +397,7 @@
     }
     else
         rc = FAILURE;
-        
+
 #if defined(MQTT_DEBUG)
     char printbuf[150];
     DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
@@ -389,7 +439,7 @@
  * If any read fails in this method, then we should disconnect from the network, as on reconnect
  * the packets can be retried.
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
- * @return the MQTT packet type, or -1 if none
+ * @return the MQTT packet type, 0 if none, -1 if error
  */
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
@@ -400,7 +450,8 @@
     int rem_len = 0;
 
     /* 1. read the header byte.  This has the packet type in it */
-    if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
+    rc = ipstack.read(readbuf, 1, timer.left_ms());
+    if (rc != 1)
         goto exit;
 
     len = 1;
@@ -423,12 +474,13 @@
     if (this->keepAliveInterval > 0)
         last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
 exit:
-        
+
 #if defined(MQTT_DEBUG)
     if (rc >= 0)
     {
         char printbuf[50];
-        DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
+        DEBUG("Rc %d from receiving packet %s\n", rc,
+            MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
     }
 #endif
     return rc;
@@ -504,7 +556,7 @@
 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
 {
     int rc = SUCCESS;
-    Timer timer = Timer();
+    Timer timer;
 
     timer.countdown_ms(timeout_ms);
     while (!timer.expired())
@@ -523,19 +575,19 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
 {
-    /* get one piece of work off the wire and one pass through */
-
-    // read the socket, see what work is due
-    int packet_type = readPacket(timer);
-
+    // get one piece of work off the wire and one pass through
     int len = 0,
         rc = SUCCESS;
 
+    int packet_type = readPacket(timer);    // read the socket, see what work is due
+
     switch (packet_type)
     {
-        case FAILURE:
-        case BUFFER_OVERFLOW:
+        default:
+            // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
             rc = packet_type;
+            goto exit;
+        case 0: // timed out reading packet
             break;
         case CONNACK:
         case PUBACK:
@@ -546,6 +598,7 @@
             MQTTString topicName = MQTTString_initializer;
             Message msg;
             int intQoS;
+            msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 goto exit;
@@ -587,8 +640,8 @@
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
-            else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, 
-                        (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
+            else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
+                                 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
                 rc = FAILURE;
             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
                 rc = FAILURE; // there was a problem
@@ -597,7 +650,7 @@
             if (packet_type == PUBREL)
                 freeQoS2msgid(mypacketid);
             break;
-            
+
         case PUBCOMP:
             break;
 #endif
@@ -605,10 +658,16 @@
             ping_outstanding = false;
             break;
     }
-    keepalive();
+
+    if (keepalive() != SUCCESS)
+        //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
+        rc = FAILURE;
+
 exit:
     if (rc == SUCCESS)
         rc = packet_type;
+    else if (isconnected)
+        closeSession();
     return rc;
 }
 
@@ -616,17 +675,22 @@
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
 {
-    int rc = FAILURE;
+    int rc = SUCCESS;
 
     if (keepAliveInterval == 0)
-    {
-        rc = SUCCESS;
         goto exit;
-    }
 
     if (last_sent.expired() || last_received.expired())
     {
-        if (!ping_outstanding)
+        if (ping_outstanding)
+        {
+            rc = FAILURE; // session failure
+            #if defined(MQTT_DEBUG)
+                char printbuf[150];
+                DEBUG("PINGRESP not received in keepalive interval\n");
+            #endif
+        }
+        else
         {
             Timer timer(1000);
             int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
@@ -650,15 +714,16 @@
     {
         if (timer.expired())
             break; // we timed out
+        rc = cycle(timer);
     }
-    while ((rc = cycle(timer)) != packet_type);
+    while (rc != packet_type && rc >= 0);
 
     return rc;
 }
 
 
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
 {
     Timer connect_timer(command_timeout_ms);
     int rc = FAILURE;
@@ -679,10 +744,11 @@
     // this will be a blocking call, wait for the connack
     if (waitfor(CONNACK, connect_timer) == CONNACK)
     {
-        unsigned char connack_rc = 255;
-        bool sessionPresent = false;
-        if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = connack_rc;
+        data.rc = 0;
+        data.sessionPresent = false;
+        if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
+                            (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+            rc = data.rc;
         else
             rc = FAILURE;
     }
@@ -710,12 +776,23 @@
 
 exit:
     if (rc == SUCCESS)
+    {
         isconnected = true;
+        ping_outstanding = false;
+    }
     return rc;
 }
 
 
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
+{
+    connackData data;
+    return connect(options, data);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
 {
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
@@ -724,7 +801,51 @@
 
 
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
+{
+    int rc = FAILURE;
+    int i = -1;
+
+    // first check for an existing matching slot
+    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+    {
+        if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
+        {
+            if (messageHandler == 0) // remove existing
+            {
+                messageHandlers[i].topicFilter = 0;
+                messageHandlers[i].fp.detach();
+            }
+            rc = SUCCESS; // return i when adding new subscription
+            break;
+        }
+    }
+    // if no existing, look for empty slot (unless we are removing)
+    if (messageHandler != 0) {
+        if (rc == FAILURE)
+        {
+            for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
+            {
+                if (messageHandlers[i].topicFilter == 0)
+                {
+                    rc = SUCCESS;
+                    break;
+                }
+            }
+        }
+        if (i < MAX_MESSAGE_HANDLERS)
+        {
+            messageHandlers[i].topicFilter = topicFilter;
+            messageHandlers[i].fp.attach(messageHandler);
+        }
+    }
+    return rc;
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
+     enum QoS qos, messageHandler messageHandler, subackData& data)
 {
     int rc = FAILURE;
     Timer timer(command_timeout_ms);
@@ -742,35 +863,34 @@
 
     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
     {
-        int count = 0, grantedQoS = -1;
+        int count = 0;
         unsigned short mypacketid;
-        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = grantedQoS; // 0, 1, 2 or 0x80
-        if (rc != 0x80)
+        data.grantedQoS = 0;
+        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
         {
-            for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
-            {
-                if (messageHandlers[i].topicFilter == 0)
-                {
-                    messageHandlers[i].topicFilter = topicFilter;
-                    messageHandlers[i].fp.attach(messageHandler);
-                    rc = 0;
-                    break;
-                }
-            }
+            if (data.grantedQoS != 0x80)
+                rc = setMessageHandler(topicFilter, messageHandler);
         }
     }
     else
         rc = FAILURE;
 
 exit:
-    if (rc != SUCCESS)
-        cleanSession();
+    if (rc == FAILURE)
+        closeSession();
     return rc;
 }
 
 
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
+{
+    subackData data;
+    return subscribe(topicFilter, qos, messageHandler, data);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
 {
     int rc = FAILURE;
@@ -791,17 +911,8 @@
         unsigned short mypacketid;  // should be the same as the packetid above
         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
         {
-            rc = 0;
-
             // remove the subscription message handler associated with this topic, if there is one
-            for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
-            {
-                if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
-                {
-                    messageHandlers[i].topicFilter = 0;
-                    break;
-                }
-            }
+            setMessageHandler(topicFilter, 0);
         }
     }
     else
@@ -809,7 +920,7 @@
 
 exit:
     if (rc != SUCCESS)
-        cleanSession();
+        closeSession();
     return rc;
 }
 
@@ -837,7 +948,8 @@
         else
             rc = FAILURE;
     }
-#elif MQTTCLIENT_QOS2
+#endif
+#if MQTTCLIENT_QOS2
     else if (qos == QOS2)
     {
         if (waitfor(PUBCOMP, timer) == PUBCOMP)
@@ -856,7 +968,7 @@
 
 exit:
     if (rc != SUCCESS)
-        cleanSession();
+        closeSession();
     return rc;
 }
 
@@ -923,17 +1035,12 @@
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
 {
     int rc = FAILURE;
-    Timer timer(command_timeout_ms);            // we might wait for incomplete incoming publishes to complete
+    Timer timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
     int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
     if (len > 0)
         rc = sendPacket(len, timer);            // send the disconnect packet
-
-    if (cleansession)
-        cleanSession();
-    else
-        isconnected = false;
+    closeSession();
     return rc;
 }
 
-
-#endif
\ No newline at end of file
+#endif
--- a/MQTTPacket.lib	Mon Sep 25 11:12:23 2017 +0000
+++ b/MQTTPacket.lib	Mon Sep 25 12:06:28 2017 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/teams/mqtt/code/MQTTPacket/#62396c1620b6
+http://mbed.org/teams/mqtt/code/MQTTPacket/#1af97b41bc9d