An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Mon Apr 07 12:24:36 2014 +0000
Parent:
3:dbff6b768d28
Child:
5:389ccac5a50c
Commit message:
Templates for both networking and tasks

Changed in this revision

MQTTClient.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.cpp	Mon Mar 31 15:48:45 2014 +0000
+++ b/MQTTClient.cpp	Mon Apr 07 12:24:36 2014 +0000
@@ -23,26 +23,29 @@
 #include "MQTTClient.h"
 #include "MQTTPacket.h"
 
-MQTT::Client::Client(IPStack* ipstack, const int buffer_size)
+template<class Network, class Thread> MQTT::Client<Network, Thread>::Client(Network* network, const int buffer_size, const int command_timeout)
 {
     
    buf = new char[buffer_size];
    this->ipstack = ipstack;
+   this->command_timeout = command_timeout;
+   this->thread = new Thread(0); // only need a background thread for non-blocking mode
+   this->ipstack = network;
 }
 
 
-int MQTT::Client::sendPacket(int length)
+template<class Network, class Thread> int MQTT::Client<Network, Thread>::sendPacket(int length)
 {
     int sent = 0;
     
     while (sent < length)
-        sent += ipstack->write(&buf[sent], length);
+        sent += ipstack->write(&buf[sent], length, -1);
         
     return sent;
 }
 
 
-int MQTT::Client::decodePacket(int* value, int timeout)
+template<class Network, class Thread> int MQTT::Client<Network, Thread>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -70,10 +73,10 @@
 }
 
 
-int MQTT::Client::readPacket(int timeout) 
+template<class Network, class Thread> int MQTT::Client<Network,Thread>::readPacket(int timeout) 
 {
     int rc = -1;
-    MQTTHeader header;
+    MQTTHeader header = {0};
     int len = 0;
     int rem_len = 0;
 
@@ -97,35 +100,53 @@
 }
 
 
-void MQTT::Client::cycle()
+template<class Network, class Thread> int MQTT::Client<Network, Thread>::cycle()
 {
     int timeout = 1000L;
     /* get one piece of work off the wire and one pass through */
     
     // 1. read the socket, see what work is due. 
-    int packet_type = readPacket(buf, buflen, -1);
+    int packet_type = readPacket(-1);
     
+    switch (packet_type)
+    {
+        case CONNACK:
+            break;
+        case PUBACK:
+            break;
+        case SUBACK:
+            break;
+        case PUBREC:
+            break;
+        case PUBCOMP:
+            break;
+        case PINGRESP:
+            break;
+    }
+    return packet_type;
 }
 
 
-int MQTT::Client::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> resultHandler)
+template<class Network, class Thread> int MQTT::Client<Network, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
 {
     /* 1. connect to the server with the desired transport */
-    if (!ipstack->connect())
-        return -99;
+    /*if (!ipstack->connect())
+        return -99;*/
     
     /* 2. if the connect was successful, send the MQTT connect packet */        
     int len = MQTTSerialize_connect(buf, buflen, options);
     sendPacket(len); // send the connect packet
     
     /* 3. wait until the connack is received */
-    /* how to make this check work?
-    if (resultHandler == None)
+    if (resultHandler == 0)
     {
         // this will be a blocking call, wait for the connack
-        
-    }*/
+        //waitfor(CONNACK);    
+    }
+    else
+    {
+        // set connect response callback function
+    }
     
     return len;
 }
-
--- a/MQTTClient.h	Mon Mar 31 15:48:45 2014 +0000
+++ b/MQTTClient.h	Mon Apr 07 12:24:36 2014 +0000
@@ -23,25 +23,16 @@
 #if !defined(MQTTCLIENT_H)
 #define MQTTCLIENT_H
 
-#include <vector>
-
 #include "mbed.h"
 #include "FP.h"
 #include "MQTTPacket.h"
-#include "include_me.h"
 
 namespace MQTT
 {
 
-class Client;
 
 enum QoS { QOS0, QOS1, QOS2 };
 
-class Result
-{
-    /* success or failure result data */
-    Client* client;
-};
 
 struct Message
 {
@@ -53,38 +44,51 @@
     size_t payloadlen;
 };
 
+template<class Network, class Thread> class Client;
+
+class Result
+{
+    /* success or failure result data */
+    Client<class Network, class Thread>* client;
+};
+
   
-class Client
+template<class Network, class Thread> class Client
 {
     
 public:    
-
-    static FP<void, Result*> None;   // default argument of no result handler to indicate call should be blocking
-    
-    Client(IPStack* ipstack, const int buffer_size = 100); 
+   
+    Client(Network* network, const int buffer_size = 100, const int command_timeout = 30);  
        
-    int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> resultHandler = None);
+    int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> *resultHandler = 0);
         
-    int publish(char* topic, Message* message, FP<void, Result*> resultHandler = None);
+    int publish(char* topic, Message* message, FP<void, Result*> *resultHandler = 0);
     
-    int subscribe(char* topicFilter, int qos, FP<void, Message*> messageHandler, FP<void, Result*> resultHandler = None);
+    int subscribe(char* topicFilter, int qos, FP<void, Message*> messageHandler, FP<void, Result*> *resultHandler = 0);
     
-    int unsubscribe(char* topicFilter, FP<void, Result*> resultHandler = None);
+    int unsubscribe(char* topicFilter, FP<void, Result*> *resultHandler = 0);
     
-    int disconnect(int timeout, FP<void, Result*> resultHandler = None);
+    int disconnect(int timeout, FP<void, Result*> *resultHandler = 0);
     
 private:
 
-    void cycle();
+    int cycle();
 
     int decodePacket(int* value, int timeout);
-    int readPacket(char* buf, int buflen, int timeout = -1);
+    int readPacket(int timeout = -1);
     int sendPacket(int length);
     
-    IPStack* ipstack;
+    Thread* thread;
+    Network* ipstack;
+    
     char* buf;
     int buflen;
     
+    char* readbuf;
+    int readbuflen;
+    
+    int command_timeout;
+    
 };
 
 }