An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
Ian Craggs
Date:
Fri Apr 11 22:31:55 2014 +0100
Parent:
10:68a4ada53367
Child:
13:fd82db992024
Commit message:
Unsubscribe, publish

Changed in this revision

MQTTClient.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.cpp	Thu Apr 10 15:19:08 2014 +0000
+++ b/MQTTClient.cpp	Fri Apr 11 22:31:55 2014 +0100
@@ -17,13 +17,6 @@
 #include "MQTTClient.h"
 #include "MQTTPacket.h"
 
-
-void MQTT::threadfn(void* arg)
-{
-   ((Client<Network, Timer, Thread>*) arg)->run(NULL);
-}
-
-
 MQTT::PacketId::PacketId()
 {
 	next = 0;
@@ -32,4 +25,5 @@
 int MQTT::PacketId::getNext()
 {
     return next = (next == MAX_PACKET_ID) ? 1 : ++next;
-}
\ No newline at end of file
+}
+
--- a/MQTTClient.h	Thu Apr 10 15:19:08 2014 +0000
+++ b/MQTTClient.h	Fri Apr 11 22:31:55 2014 +0100
@@ -33,41 +33,42 @@
     enum QoS qos;
     bool retained;
     bool dup;
-    unsigned short msgid;
+    unsigned short id;
     void *payload;
     size_t payloadlen;
 };
 
 template<class Network, class Timer, class Thread> class Client;
 
-class Result
-{
-    /* success or failure result data */
-    Client<class Network, class Timer, class Thread>* client;
-};
-
-
 class PacketId
 {
 public:
     PacketId();
     
-    int getNext();
-    
+    int getNext();
+   
 private:
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
 
-typedef void (*resultHandler)(Result*);
 typedef void (*messageHandler)(Message*);
   
 template<class Network, class Timer, class Thread> class Client
 {
     
-public:    
+public:    
+
+	struct Result
+	{
+    	/* success or failure result data */
+    	Client<Network, Timer, Thread>* client;
+		int connack_rc;
+	};
+
+	typedef void (*resultHandler)(Result*);
    
-    Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); 
+    Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); 
        
     int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
     
@@ -78,7 +79,7 @@
     
     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
     
-    int unsubscribe(char* topicFilter, resultHandler rh = 0);
+    int unsubscribe(const char* topicFilter, resultHandler rh = 0);
     
     int disconnect(int timeout, resultHandler rh = 0);
     
@@ -86,7 +87,8 @@
     
 private:
 
-    int cycle();
+    int cycle(int timeout);
+	int keepalive();
 
     int decodePacket(int* value, int timeout);
     int readPacket(int timeout = -1);
@@ -94,16 +96,18 @@
     
     Thread* thread;
     Network* ipstack;
-    Timer* timer;
+    Timer command_timer, ping_timer;
     
     char* buf; 
     int buflen;
     
     char* readbuf;
-    int readbuflen;
+    int readbuflen;
+
+    unsigned int keepAliveInterval;
+	bool ping_outstanding;
     
     int command_timeout; // max time to wait for any MQTT command to complete, in seconds
-    int keepalive;
     PacketId packetid;
     
     typedef FP<void, Result*> resultHandlerFP;    
@@ -112,24 +116,33 @@
     
     #define MAX_MESSAGE_HANDLERS 5
     typedef FP<void, Message*> messageHandlerFP;
-    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
+    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
+
+	static void threadfn(void* arg);
     
 };
 
-void threadfn(void* arg);
+}
+
+
+template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+{
+    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+}
 
-}
 
-template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)  : packetid()
+template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout)  : packetid()
 {
     
-   buf = new char[buffer_size];
-   readbuf = new char[buffer_size];
-   buflen = readbuflen = buffer_size;
+   buf = new char[MAX_MQTT_PACKET_SIZE];
+   readbuf = new char[MAX_MQTT_PACKET_SIZE];
+   buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
    this->command_timeout = command_timeout;
    this->thread = 0;
    this->ipstack = network;
-   this->timer = timer;
+   this->command_timer = Timer();
+   this->ping_timer = Timer();
+   this->ping_outstanding = 0;
 }
 
 
@@ -138,8 +151,9 @@
     int sent = 0;
     
     while (sent < length)
-        sent += ipstack->write(&buf[sent], length, -1);
-        
+        sent += ipstack->write(&buf[sent], length, -1);
+	if (sent == length)
+	    ping_timer.reset(); // record the fact that we have successfully sent the packet    
     return sent;
 }
 
@@ -205,69 +219,119 @@
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
 {
-    int timeout = -1;
     /* get one piece of work off the wire and one pass through */
     
-    // 1. read the socket, see what work is due. 
+    // read the socket, see what work is due
     int packet_type = readPacket(timeout);
 
     printf("packet type %d\n", packet_type);
-    
+    
+	int len, rc;
     switch (packet_type)
     {
-        case CONNACK:
-            printf("connack received\n");
-            break;
-        case PUBLISH:
-            break;
+        case CONNACK:
+			if (this->thread)
+			{
+				Result res = {this, 0};
+            	int connack_rc = -1;
+            	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
+                	;
+				connectHandler(&res);
+			}
         case PUBACK:
-            break;
         case SUBACK:
             break;
-        case PUBREC:
+        case PUBLISH:
+			MQTTString topicName;
+			Message msg;
+			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+			if (msg.qos == QOS0)
+				messageHandlers[0](&msg);
+            break;
+        case PUBREC:
+   	        int type, dup, mypacketid;
+   	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+   	            ; 
+			len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
+		    rc = sendPacket(len); // send the subscribe packet
+			if (rc != len) 
+				goto exit; // there was a problem
+
             break;
         case PUBCOMP:
             break;
-        case PINGRESP:
+        case PINGRESP:
+			if (ping_outstanding)
+				ping_outstanding = false;
+			//else disconnect();
             break;
         case -1:
             break;
-    }
+    }
+	keepalive();
+exit:
     return packet_type;
+}
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
+{
+	int rc = 0;
+
+	if (keepAliveInterval == 0)
+		goto exit;
+
+	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+	{
+		if (ping_outstanding)
+			rc = -1;
+		else
+		{
+			int len = MQTTSerialize_pingreq(buf, buflen);
+			rc = sendPacket(len); // send the connect packet
+			if (rc != len) 
+				rc = -1; // indicate there's a problem
+			else
+				ping_outstanding = true;
+		}
+	}
+
+exit:
+	return rc;
 }
 
 
 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
-{
+{
+	while (true)
+		cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
 }
 
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
-{
-    int len = 0;
-    int rc = -99;
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+{
+	command_timer.start();
 
-    /* 2. if the connect was successful, send the MQTT connect packet */   
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
-    {
-        default_options.clientID.cstring = "me";
-        options = &default_options;
-    }
+        options = &default_options; // set default options if none were supplied
     
-    this->keepalive = options->keepAliveInterval;
-    len = MQTTSerialize_connect(buf, buflen, options);
-    printf("len from send is %d %d\n", len, buflen);
-    rc = sendPacket(len); // send the connect packet
-    printf("rc from send is %d\n", rc);
+    this->keepAliveInterval = options->keepAliveInterval;
+	ping_timer.start();
+    int len = MQTTSerialize_connect(buf, buflen, options);
+    int rc = sendPacket(len); // send the connect packet
+	if (rc != len) 
+		goto exit; // there was a problem
     
-    /* 3. wait until the connack is received */
-    if (resultHandler == 0)
-    {
+    if (resultHandler == 0)     // wait until the connack is received 
+    {
+		if (command_timer.read_ms() > (command_timeout * 1000)) 
+			goto exit; // we timed out
         // this will be a blocking call, wait for the connack
-        if (cycle() == CONNACK)
+        if (cycle(command_timeout - command_timer.read_ms()) == CONNACK)
         {
             int connack_rc = -1;
             if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
@@ -279,30 +343,33 @@
         // set connect response callback function
         connectHandler.attach(resultHandler);
         
-        // start background thread
-            
-        this->thread = new Thread((void (*)(void const *argument))&MQTT::threadfn, (void*)this);
+        // start background thread            
+        this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
     }
-    
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
     return rc;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, 
-    messageHandler messageHandler, resultHandler resultHandler)
-{
-    int rc = -1, 
-        len = 0;
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+{
+	command_timer.start();
+
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    rc = sendPacket(len); // send the subscribe packet
+    int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
+		goto exit; // there was a problem
     
     /* wait for suback */
     if (resultHandler == 0)
     {
         // this will block
-        if (cycle() == SUBACK)
+        if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
         {
             int count = 0, grantedQoS = -1, mypacketid;
             if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
@@ -314,7 +381,103 @@
         // set subscribe response callback function
         
     }
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
+    return rc;
+}
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+{
+	command_timer.start();
+
+    MQTTString topic = {(char*)topicFilter, 0, 0};
     
+    int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
+		goto exit; // there was a problem
+    
+    /* wait for suback */
+    if (resultHandler == 0)
+    {
+        // this will block
+        if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
+        {
+            int mypacketid;
+            if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
+                rc = 0; 
+        }
+    }
+    else
+    {
+        // set unsubscribe response callback function
+        
+    }
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
+    return rc;
+}
+
+
+   
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+{
+	command_timer.start();
+
+    MQTTString topic = {(char*)topicName, 0, 0};
+
+	message->id = packetid.getNext();
+    
+	int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
+		goto exit; // there was a problem
+    
+    /* wait for acks */
+    if (resultHandler == 0)
+    {
+ 		if (message->qos == QOS1)
+		{
+	        if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
+    	    {
+    	        int type, dup, mypacketid;
+    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	            rc = 0; 
+    	    }
+		}
+		else if (message->qos == QOS2)
+		{
+	        if (cycle(command_timeout - command_timer.read_ms()) == PUBREC)
+    	    {
+    	        int type, dup, mypacketid;
+    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	            rc = 0; 
+				len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id);
+			    rc = sendPacket(len); // send the subscribe packet
+				if (rc != len) 
+					goto exit; // there was a problem
+		        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+	    	    {
+    	        	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	            	rc = 0; 
+				}
+    	    }
+		}
+    }
+    else
+    {
+        // set publish response callback function
+        
+    }
+    
+exit:
+	command_timer.stop();
+	command_timer.reset();
     return rc;
 }