An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Sun Apr 13 22:32:28 2014 +0000
Parent:
13:fd82db992024
Child:
16:91c2f9a144d4
Commit message:
I really want the arrays to be allocated in automatic storage

Changed in this revision

FP.lib Show annotated file Show diff for this revision Revisions of this file
MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
MQTTPacket.lib Show annotated file Show diff for this revision Revisions of this file
--- a/FP.lib	Fri Apr 11 22:46:37 2014 +0100
+++ b/FP.lib	Sun Apr 13 22:32:28 2014 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/users/sam_grove/code/FP/#bc7c28fe64b6
+http://mbed.org/users/sam_grove/code/FP/#e0f19cdaa46e
--- a/MQTTClient.h	Fri Apr 11 22:46:37 2014 +0100
+++ b/MQTTClient.h	Sun Apr 13 22:32:28 2014 +0000
@@ -45,7 +45,7 @@
 public:
     PacketId();
     
-    int getNext();
+    int getNext();
    
 private:
     static const int MAX_PACKET_ID = 65535;
@@ -57,16 +57,32 @@
 template<class Network, class Timer, class Thread> class Client
 {
     
-public:    
-
+public:    
+
 	struct Result
 	{
     	/* success or failure result data */
-    	Client<Network, Timer, Thread>* client;
+    	Client<Network, Timer, Thread>* client;
 		int connack_rc;
-	};
-
+	};
+
 	typedef void (*resultHandler)(Result*);
+	
+	struct limits
+	{
+		int MAX_MQTT_PACKET_SIZE; // 
+		int MAX_MESSAGE_HANDLERS;  // 5 - each subscription requires a message handler
+		int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
+		int command_timeout;
+		
+		limits()
+		{
+			MAX_MQTT_PACKET_SIZE = 100;
+			MAX_MESSAGE_HANDLERS = 5;
+			MAX_CONCURRENT_OPERATIONS= 5;
+			command_timeout = 30;	
+		}
+	};
    
     Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); 
        
@@ -87,61 +103,74 @@
     
 private:
 
-    int cycle(int timeout);
+    int cycle(int timeout);
 	int keepalive();
 
     int decodePacket(int* value, int timeout);
     int readPacket(int timeout = -1);
     int sendPacket(int length, int timeout = -1);
+	int deliverMessage(MQTTString* topic, Message* message);
     
     Thread* thread;
     Network* ipstack;
     Timer command_timer, ping_timer;
     
-    char* buf; 
+    char buf[];
     int buflen;
     
     char* readbuf;
-    int readbuflen;
-
-    unsigned int keepAliveInterval;
+    int readbuflen;
+
+    unsigned int keepAliveInterval;
 	bool ping_outstanding;
     
     int command_timeout; // max time to wait for any MQTT command to complete, in seconds
     PacketId packetid;
     
     typedef FP<void, Result*> resultHandlerFP;    
-    // how many concurrent operations should we allow?  Each one will require a function pointer
     resultHandlerFP connectHandler; 
     
     #define MAX_MESSAGE_HANDLERS 5
     typedef FP<void, Message*> messageHandlerFP;
-    messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS];  // Linked list, or constructor parameter to limit array size?
-
+    struct
+    {
+    	char* topic;
+    	messageHandlerFP fp;
+    } messageHandlers[MAX_MESSAGE_HANDLERS];  // Message handlers are linked to a subscription topic
+    
+    // how many concurrent operations should we allow?  Each one will require a function pointer
+    struct
+    {
+    	unsigned short id;
+    	resultHandlerFP fp;
+    	MQTTString* topic;  // if this is a publish, store topic name in case republishing is required
+    	Message* message;  // for publish, 
+    } *operations;  // result handlers are indexed by packet ids
+
 	static void threadfn(void* arg);
     
 };
 
-}
-
-
-template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
-{
-    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+}
+
+
+template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+{
+    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
 }
 
 
 template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout)  : packetid()
 {
-    
-   buf = new char[MAX_MQTT_PACKET_SIZE];
+   //buf = new char[MAX_MQTT_PACKET_SIZE];
    readbuf = new char[MAX_MQTT_PACKET_SIZE];
    buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
+   
    this->command_timeout = command_timeout;
    this->thread = 0;
    this->ipstack = network;
-   this->command_timer = Timer();
-   this->ping_timer = Timer();
+   this->command_timer = Timer();
+   this->ping_timer = Timer();
    this->ping_outstanding = 0;
 }
 
@@ -151,7 +180,7 @@
     int sent = 0;
     
     while (sent < length)
-        sent += ipstack->write(&buf[sent], length, -1);
+        sent += ipstack->write(&buf[sent], length, -1);
 	if (sent == length)
 	    ping_timer.reset(); // record the fact that we have successfully sent the packet    
     return sent;
@@ -163,7 +192,7 @@
     char c;
     int multiplier = 1;
     int len = 0;
-#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+	const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
 
     *value = 0;
     do
@@ -219,6 +248,11 @@
 }
 
 
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message)
+{
+}
+
+
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
 {
     /* get one piece of work off the wire and one pass through */
@@ -227,116 +261,114 @@
     int packet_type = readPacket(timeout);
 
     printf("packet type %d\n", packet_type);
-    
+    
 	int len, rc;
     switch (packet_type)
     {
-        case CONNACK:
-			if (this->thread)
-			{
-				Result res = {this, 0};
-            	int connack_rc = -1;
+        case CONNACK:
+			if (this->thread)
+			{
+				Result res = {this, 0};
             	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
-                	;
-				connectHandler(&res);
-			}
+                	;
+				connectHandler(&res);
+				connectHandler.detach(); // only invoke the callback once
+			}
         case PUBACK:
         case SUBACK:
             break;
-        case PUBLISH:
-			MQTTString topicName;
-			Message msg;
-			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
-			if (msg.qos == QOS0)
-				messageHandlers[0](&msg);
+        case PUBLISH:
+			MQTTString topicName;
+			Message msg;
+			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+			if (msg.qos == QOS0)
+				deliverMessage(&topicName, &msg);
             break;
-        case PUBREC:
+        case PUBREC:
    	        int type, dup, mypacketid;
    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
-   	            ; 
+   	            ; 
+   	        // must lock this access against the application thread, if we are multi-threaded
 			len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
-		    rc = sendPacket(len); // send the subscribe packet
-			if (rc != len) 
-				goto exit; // there was a problem
+		    rc = sendPacket(len); // send the subscribe packet
+			if (rc != len) 
+				goto exit; // there was a problem
 
             break;
         case PUBCOMP:
             break;
-        case PINGRESP:
-			if (ping_outstanding)
-				ping_outstanding = false;
-			//else disconnect();
+        case PINGRESP:
+			ping_outstanding = false;
             break;
-        case -1:
-            break;
-    }
-	keepalive();
+    }
+	keepalive();
 exit:
     return packet_type;
-}
-
-
+}
+
+
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
-{
-	int rc = 0;
-
-	if (keepAliveInterval == 0)
-		goto exit;
-
-	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
-	{
-		if (ping_outstanding)
-			rc = -1;
-		else
-		{
-			int len = MQTTSerialize_pingreq(buf, buflen);
-			rc = sendPacket(len); // send the connect packet
-			if (rc != len) 
-				rc = -1; // indicate there's a problem
-			else
-				ping_outstanding = true;
-		}
-	}
-
-exit:
+{
+	int rc = 0;
+
+	if (keepAliveInterval == 0)
+		goto exit;
+
+	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+	{
+		if (ping_outstanding)
+			rc = -1;
+		else
+		{
+			int len = MQTTSerialize_pingreq(buf, buflen);
+			rc = sendPacket(len); // send the connect packet
+			if (rc != len) 
+				rc = -1; // indicate there's a problem
+			else
+				ping_outstanding = true;
+		}
+	}
+
+exit:
 	return rc;
 }
 
 
 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
-{
-	while (true)
+{
+	while (true)
 		cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
 }
 
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
         options = &default_options; // set default options if none were supplied
     
-    this->keepAliveInterval = options->keepAliveInterval;
+    this->keepAliveInterval = options->keepAliveInterval;
 	ping_timer.start();
     int len = MQTTSerialize_connect(buf, buflen, options);
-    int rc = sendPacket(len); // send the connect packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the connect packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     if (resultHandler == 0)     // wait until the connack is received 
-    {
-		if (command_timer.read_ms() > (command_timeout * 1000)) 
-			goto exit; // we timed out
+    {
         // this will be a blocking call, wait for the connack
-        if (cycle(command_timeout - command_timer.read_ms()) == CONNACK)
+		do
         {
-            int connack_rc = -1;
-            if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
-                rc = connack_rc;
-        }
+			if (command_timer.read_ms() > (command_timeout * 1000)) 
+				goto exit; // we timed out
+		}
+		while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
+        int connack_rc = -1;
+        if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
+	        rc = connack_rc;
     }
     else
     {
@@ -346,23 +378,23 @@
         // start background thread            
         this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
 }
 
 
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    int rc = sendPacket(len); // send the subscribe packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     /* wait for suback */
@@ -381,23 +413,23 @@
         // set subscribe response callback function
         
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
-}
-
-
+}
+
+
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
-    int rc = sendPacket(len); // send the subscribe packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     /* wait for suback */
@@ -416,57 +448,49 @@
         // set unsubscribe response callback function
         
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
-}
-
-
-   
+}
+
+
+   
 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
-{
-	command_timer.start();
+{
+	command_timer.start();
 
-    MQTTString topic = {(char*)topicName, 0, 0};
-
+    MQTTString topic = {(char*)topicName, 0, 0};
+
 	message->id = packetid.getNext();
-    
+    
 	int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
-    int rc = sendPacket(len); // send the subscribe packet
-	if (rc != len) 
+    int rc = sendPacket(len); // send the subscribe packet
+	if (rc != len) 
 		goto exit; // there was a problem
     
     /* wait for acks */
     if (resultHandler == 0)
     {
- 		if (message->qos == QOS1)
+ 		if (message->qos == QOS1)
 		{
 	        if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
     	    {
     	        int type, dup, mypacketid;
     	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
     	            rc = 0; 
-    	    }
-		}
-		else if (message->qos == QOS2)
-		{
-	        if (cycle(command_timeout - command_timer.read_ms()) == PUBREC)
-    	    {
-    	        int type, dup, mypacketid;
-    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
-    	            rc = 0; 
-				len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id);
-			    rc = sendPacket(len); // send the subscribe packet
-				if (rc != len) 
-					goto exit; // there was a problem
-		        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
-	    	    {
-    	        	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
-    	            	rc = 0; 
-				}
-    	    }
+    	    }
+		}
+		else if (message->qos == QOS2)
+		{
+	        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+	   	    {
+	   	    	int type, dup, mypacketid;
+            	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+    	           	rc = 0; 
+			}
+
 		}
     }
     else
@@ -474,9 +498,9 @@
         // set publish response callback function
         
     }
-    
-exit:
-	command_timer.stop();
+    
+exit:
+	command_timer.stop();
 	command_timer.reset();
     return rc;
 }
--- a/MQTTPacket.lib	Fri Apr 11 22:46:37 2014 +0100
+++ b/MQTTPacket.lib	Sun Apr 13 22:32:28 2014 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/teams/mqtt/code/MQTTPacket/#bc3bc0e3b764
+http://mbed.org/teams/mqtt/code/MQTTPacket/#c502573c6016