An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
Revision:
3:dbff6b768d28
Parent:
2:dcfdd2abfe71
Child:
4:4ef00243708e
--- a/MQTTClient.cpp	Fri Mar 28 13:39:25 2014 +0000
+++ b/MQTTClient.cpp	Mon Mar 31 15:48:45 2014 +0000
@@ -23,32 +23,102 @@
 #include "MQTTClient.h"
 #include "MQTTPacket.h"
 
-MQTTClient::MQTTClient(char* serverURI, char* clientId, const int buffer_size)
+MQTT::Client::Client(IPStack* ipstack, const int buffer_size)
 {
     
    buf = new char[buffer_size];
-   this->clientId = clientId;
-   this->serverURI = serverURI;
+   this->ipstack = ipstack;
+}
+
+
+int MQTT::Client::sendPacket(int length)
+{
+    int sent = 0;
+    
+    while (sent < length)
+        sent += ipstack->write(&buf[sent], length);
+        
+    return sent;
+}
+
+
+int MQTT::Client::decodePacket(int* value, int timeout)
+{
+    char c;
+    int multiplier = 1;
+    int len = 0;
+#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+
+    *value = 0;
+    do
+    {
+        int rc = MQTTPACKET_READ_ERROR;
+
+        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+        {
+            rc = MQTTPACKET_READ_ERROR; /* bad data */
+            goto exit;
+        }
+        rc = ipstack->read(&c, 1, timeout);
+        if (rc != 1)
+            goto exit;
+        *value += (c & 127) * multiplier;
+        multiplier *= 128;
+    } while ((c & 128) != 0);
+exit:
+    return len;
 }
 
 
-int MQTTClient::connect(MQTTConnectOptions* options, FP<void, MQTTResult*> resultHandler)
+int MQTT::Client::readPacket(int timeout) 
 {
-    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
-     
-    data.clientID.cstring = clientId;
-    if (options)
-    {
-        data.keepAliveInterval = options->keepAliveInterval;
-        data.cleansession = options->cleansession;
-        data.username.cstring = options->username;
-        data.password.cstring = options->password;
-    }
-     
-    int len = MQTTSerialize_connect(buf, buflen, &data);
+    int rc = -1;
+    MQTTHeader header;
+    int len = 0;
+    int rem_len = 0;
+
+    /* 1. read the header byte.  This has the packet type in it */
+    if ((rc = ipstack->read(readbuf, 1, timeout)) != 1)
+        goto exit;
+
+    len = 1;
+    /* 2. read the remaining length.  This is variable in itself */
+    decodePacket(&rem_len, timeout);
+    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
+
+    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+    if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len)
+        goto exit;
+
+    header.byte = buf[0];
+    rc = header.bits.type;
+exit:
+    return rc;
+}
+
+
+void MQTT::Client::cycle()
+{
+    int timeout = 1000L;
+    /* get one piece of work off the wire and one pass through */
     
-    sendPacket(buf, buflen); // send the connect packet
+    // 1. read the socket, see what work is due. 
+    int packet_type = readPacket(buf, buflen, -1);
     
+}
+
+
+int MQTT::Client::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> resultHandler)
+{
+    /* 1. connect to the server with the desired transport */
+    if (!ipstack->connect())
+        return -99;
+    
+    /* 2. if the connect was successful, send the MQTT connect packet */        
+    int len = MQTTSerialize_connect(buf, buflen, options);
+    sendPacket(len); // send the connect packet
+    
+    /* 3. wait until the connack is received */
     /* how to make this check work?
     if (resultHandler == None)
     {