An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Tue May 06 09:44:23 2014 +0000
Parent:
22:aadb79d29330
Child:
24:c56a5c7d2a52
Child:
26:2658bb87c53d
Commit message:
Allocate arrays from automatic storage

Changed in this revision

MQTTAsync.h Show annotated file Show diff for this revision Revisions of this file
MQTTClient.cpp Show diff for this revision Revisions of this file
MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTAsync.h	Wed Apr 30 13:03:45 2014 +0000
+++ b/MQTTAsync.h	Tue May 06 09:44:23 2014 +0000
@@ -604,4 +604,4 @@
 
 
 
-#endif
+#endif
\ No newline at end of file
--- a/MQTTClient.cpp	Wed Apr 30 13:03:45 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2014 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- *    http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- *   http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- *    Ian Craggs - initial API and implementation and/or initial documentation
- *******************************************************************************/
- 
-#include "MQTTClient.h"
-#include "MQTTPacket.h"
-
-MQTT::PacketId::PacketId()
-{
-	next = 0;
-}
-
-int MQTT::PacketId::getNext()
-{
-    return next = (next == MAX_PACKET_ID) ? 1 : ++next;
-}
-
--- a/MQTTClient.h	Wed Apr 30 13:03:45 2014 +0000
+++ b/MQTTClient.h	Tue May 06 09:44:23 2014 +0000
@@ -19,8 +19,13 @@
  TODO: 
  
  log messages - use macros
+ 
  define return code constants
  
+ call connectionLost at appropriate points - in sendPacket and readPacket
+ 
+ match wildcard topics
+ 
  */
 
 #if !defined(MQTTCLIENT_H)
@@ -36,6 +41,8 @@
 
 enum QoS { QOS0, QOS1, QOS2 };
 
+enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
+
 
 struct Message
 {
@@ -58,50 +65,36 @@
 class PacketId
 {
 public:
-    PacketId();
+    PacketId()
+    {
+        next = 0;
+    }
     
-    int getNext();
+    int getNext()
+    {
+        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+    }
    
 private:
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
-
-typedef void (*messageHandler)(Message*);
-
-typedef struct limits
-{
-    int MAX_MQTT_PACKET_SIZE; // 
-    int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
-    long command_timeout_ms;
-        
-    limits()
-    {
-        MAX_MQTT_PACKET_SIZE = 100;
-        MAX_MESSAGE_HANDLERS = 5;
-        command_timeout_ms = 30000;
-    }
-} Limits;
   
   
 /**
  * @class Client
  * @brief blocking, non-threaded MQTT client API
+ * 
+ * This version of the API blocks on all method calls, until they are complete.  This means that only one
+ * MQTT request can be in process at any one time.  
  * @param Network a network class which supports send, receive
  * @param Timer a timer class with the methods: 
  */ 
-template<class Network, class Timer> class Client
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
 {
     
 public:
 
-    /** Construct the client
-     *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
-     *      before calling MQTT connect
-     *  @param limits an instance of the Limit class - to alter limits as required
-     */
-    Client(Network* network, const Limits limits = Limits()); 
-    
     typedef struct
     {
         Client* client;
@@ -109,6 +102,14 @@
     } connectionLostInfo;
     
     typedef int (*connectionLostHandlers)(connectionLostInfo*);
+    typedef void (*messageHandler)(Message*);
+
+    /** Construct the client
+     *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
+     *      before calling MQTT connect
+     *  @param limits an instance of the Limit class - to alter limits as required
+     */
+    Client(Network& network, unsigned int command_timeout_ms = 30000); 
     
     /** Set the connection lost callback - called whenever the connection is lost and we should be connected
      *  @param clh - pointer to the callback function
@@ -162,8 +163,9 @@
     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
      *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be 
      *  received.
+     *  @param timeout_ms the time to wait, in milliseconds
      */
-    void yield(int timeout);
+    void yield(int timeout_ms = 1000);
     
 private:
 
@@ -176,12 +178,11 @@
     int sendPacket(int length, Timer& timer);
     int deliverMessage(MQTTString* topic, Message* message);
     
-    Network* ipstack;
+    Network& ipstack;
+    unsigned int command_timeout_ms;
     
-    Limits limits;
-    
-    char* buf;  
-    char* readbuf;
+    char buf[MAX_MQTT_PACKET_SIZE];  
+    char readbuf[MAX_MQTT_PACKET_SIZE];  
 
     Timer ping_timer;
     unsigned int keepAliveInterval;
@@ -194,12 +195,11 @@
     {
         const char* topic;
         messageHandlerFP fp;
-    } *messageHandlers;      // Message handlers are indexed by subscription topic
+    } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
     
     messageHandlerFP defaultMessageHandler;
     
     typedef FP<int, connectionLostInfo*> connectionLostFP;
-    
     connectionLostFP connectionLostHandler;
     
 };
@@ -207,34 +207,46 @@
 }
 
 
-template<class Network, class Timer> MQTT::Client<Network, Timer>::Client(Network* network, Limits limits)  : limits(limits), packetid()
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
 {
-    this->ipstack = network;
-    this->ping_timer = Timer();
-    this->ping_outstanding = 0;
-       
-    // How to make these memory allocations portable?  I was hoping to avoid the heap
-    buf = new char[limits.MAX_MQTT_PACKET_SIZE];
-    readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
-    this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
-    for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+    ping_timer = Timer();
+    ping_outstanding = 0;
+    for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
         messageHandlers[i].topic = 0;
+    this->command_timeout_ms = command_timeout_ms;    
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::sendPacket(int length, Timer& timer)
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
 {
-    int sent = 0;
+    int rc = FAILURE, 
+        sent = 0;
     
-    while (sent < length)
-        sent += ipstack->write(&buf[sent], length, timer.left_ms());
+    while (sent < length && !timer.expired())
+    {
+        rc = ipstack.write(&buf[sent], length, timer.left_ms());
+        if (rc == -1)
+        {
+            connectionLostInfo info = {this, &ipstack};
+            connectionLostHandler(&info);
+        }
+        else
+            sent += rc;
+    }
     if (sent == length)
+    {
         ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
-    return sent;
+        rc = SUCCESS;
+    }
+    else
+        rc = FAILURE;
+    return rc;
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -251,7 +263,7 @@
             rc = MQTTPACKET_READ_ERROR; /* bad data */
             goto exit;
         }
-        rc = ipstack->read(&c, 1, timeout);
+        rc = ipstack.read(&c, 1, timeout);
         if (rc != 1)
             goto exit;
         *value += (c & 127) * multiplier;
@@ -268,7 +280,8 @@
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::readPacket(Timer& timer) 
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 
 {
     int rc = -1;
     MQTTHeader header = {0};
@@ -276,7 +289,7 @@
     int rem_len = 0;
 
     /* 1. read the header byte.  This has the packet type in it */
-    if (ipstack->read(readbuf, 1, timer.left_ms()) != 1)
+    if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
         goto exit;
 
     len = 1;
@@ -285,7 +298,7 @@
     len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
 
     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
-    if (ipstack->read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
+    if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
         goto exit;
 
     header.byte = readbuf[0];
@@ -295,12 +308,13 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::deliverMessage(MQTTString* topic, Message* message)
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString* topic, Message* message)
 {
     int rc = -1;
 
     // we have to find the right message handler - indexed by topic
-    for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+    for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
     {
         if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
         {
@@ -317,17 +331,19 @@
 
 
 
-template<class Network, class Timer> void MQTT::Client<Network, Timer>::yield(int timeout)
+template<class Network, class Timer, int a, int b> 
+void MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
 {
     Timer timer = Timer();
     
-    timer.countdown_ms(timeout);
+    timer.countdown_ms(timeout_ms);
     while (!timer.expired())
         cycle(timer);
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::cycle(Timer& timer)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
 {
     /* get one piece of work off the wire and one pass through */
 
@@ -345,26 +361,24 @@
             MQTTString topicName;
             Message msg;
             rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
+                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE);;
             deliverMessage(&topicName, &msg);
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
-                    len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
+                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
                 else if (msg.qos == QOS2)
-                    len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
-                rc = sendPacket(len, timer); 
-                if (rc != len) 
+                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
+                if ((rc = sendPacket(len, timer)) != SUCCESS)
                     goto exit; // there was a problem
             }
             break;
         case PUBREC:
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                 ; 
-            len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
-            rc = sendPacket(len, timer); // send the PUBREL packet
-            if (rc != len) 
+            len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
+            if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
                 goto exit; // there was a problem
 
             break;
@@ -380,7 +394,8 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::keepalive()
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
 {
     int rc = 0;
 
@@ -394,9 +409,9 @@
         else
         {
             Timer timer = Timer(1000);
-            int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
+            int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
             rc = sendPacket(len, timer); // send the ping packet
-            if (rc != len) 
+            if (rc != SUCCESS) 
                 rc = -1; // indicate there's a problem
             else
                 ping_outstanding = true;
@@ -409,7 +424,8 @@
 
 
 // only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::waitfor(int packet_type, Timer& timer)
+template<class Network, class Timer, int a, int b> 
+int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
 {
     int rc = -1;
     
@@ -424,9 +440,10 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::connect(MQTTPacket_connectData* options)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
 {
-    Timer connect_timer = Timer(limits.command_timeout_ms);
+    Timer connect_timer = Timer(command_timeout_ms);
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
@@ -434,16 +451,16 @@
     
     this->keepAliveInterval = options->keepAliveInterval;
     ping_timer.countdown(this->keepAliveInterval);
-    int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
+    int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
     int rc = sendPacket(len, connect_timer); // send the connect packet
-    if (rc != len) 
+    if (rc != SUCCESS) 
         goto exit; // there was a problem
     
     // this will be a blocking call, wait for the connack
     if (waitfor(CONNACK, connect_timer) == CONNACK)
     {
         int connack_rc = -1;
-        if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+        if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = connack_rc;
     }
     
@@ -452,28 +469,29 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 { 
     int len = -1;
-    Timer timer = Timer(limits.command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);
     
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    int rc = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    int rc = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
     if (rc <= 0)
         goto exit;
     len = rc;
-    if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
     
     if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback 
     {
         int count = 0, grantedQoS = -1, mypacketid;
-        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = grantedQoS; // 0, 1, 2 or 0x80 
         if (rc != 0x80)
         {
-            for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+            for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
             {
                 if (messageHandlers[i].topic == 0)
                 {
@@ -491,24 +509,25 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::unsubscribe(const char* topicFilter)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
 {   
     int len = -1;
-    Timer timer = Timer(limits.command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);
     
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    int rc = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
+    int rc = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
     if (rc <= 0)
         goto exit;
     len = rc;
-    if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit; // there was a problem
     
     if (waitfor(UNSUBACK, timer) == UNSUBACK)
     {
         int mypacketid;  // should be the same as the packetid above
-        if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+        if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
             rc = 0; 
     }
     
@@ -518,19 +537,20 @@
 
 
    
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::publish(const char* topicName, Message* message)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
 {
-    Timer timer = Timer(limits.command_timeout_ms);
+    Timer timer = Timer(command_timeout_ms);
     
     MQTTString topicString = {(char*)topicName, 0, 0};
 
     if (message->qos == QOS1 || message->qos == QOS2)
         message->id = packetid.getNext();
     
-    int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
+    int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
               topicString, (char*)message->payload, message->payloadlen);
     int rc = sendPacket(len, timer); // send the subscribe packet
-    if (rc != len) 
+    if (rc != SUCCESS) 
         goto exit; // there was a problem
     
     if (message->qos == QOS1)
@@ -538,7 +558,7 @@
         if (waitfor(PUBACK, timer) == PUBACK)
         {
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                 rc = 0; 
         }
     }
@@ -547,7 +567,7 @@
         if (waitfor(PUBCOMP, timer) == PUBCOMP)
         {
             int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                 rc = 0; 
         }
     }
@@ -557,13 +577,14 @@
 }
 
 
-template<class Network, class Timer> int MQTT::Client<Network, Timer>::disconnect()
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
 {  
-    Timer timer = Timer(limits.command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
-    int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
+    Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
+    int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
     int rc = sendPacket(len, timer);   // send the disconnect packet
     
-    return (rc == len) ? 0 : -1;
+    return rc;
 }