An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Tue Apr 29 16:04:55 2014 +0000
Parent:
20:cad3d54d7ecf
Child:
22:aadb79d29330
Commit message:
Add more structure to the Async client

Changed in this revision

MQTTAsync.h Show annotated file Show diff for this revision Revisions of this file
MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTAsync.h	Mon Apr 28 16:07:51 2014 +0000
+++ b/MQTTAsync.h	Tue Apr 29 16:04:55 2014 +0000
@@ -14,8 +14,8 @@
  *    Ian Craggs - initial API and implementation and/or initial documentation
  *******************************************************************************/
 
-#if !defined(MQTTCLIENT_H)
-#define MQTTCLIENT_H
+#if !defined(MQTTASYNC_H)
+#define MQTTASYNC_H
 
 #include "FP.h"
 #include "MQTTPacket.h"
@@ -58,19 +58,19 @@
 	int MAX_MQTT_PACKET_SIZE; // 
 	int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
 	int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
-	int command_timeout;
+	int command_timeout_ms;
 		
 	limits()
 	{
 		MAX_MQTT_PACKET_SIZE = 100;
 		MAX_MESSAGE_HANDLERS = 5;
 		MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
-		command_timeout = 30;
+		command_timeout_ms = 30000;
 	}
 } Limits;
   
   
-template<class Network, class Timer, class Thread, class Mutex> class Client
+template<class Network, class Timer, class Thread, class Mutex> class Async
 {
     
 public:    
@@ -78,28 +78,50 @@
 	struct Result
 	{
     	/* success or failure result data */
-    	Client<Network, Timer, Thread, Mutex>* client;
-		int connack_rc;
+    	Async<Network, Timer, Thread, Mutex>* client;
+		int rc;
 	};
 
 	typedef void (*resultHandler)(Result*);	
    
-    Client(Network* network, const Limits limits = Limits()); 
+    Async(Network* network, const Limits limits = Limits()); 
+        
+    typedef struct
+    {
+        Async* client;
+        Network* network;
+    } connectionLostInfo;
+    
+    typedef int (*connectionLostHandlers)(connectionLostInfo*);
+    
+    /** Set the connection lost callback - called whenever the connection is lost and we should be connected
+     *  @param clh - pointer to the callback function
+     */
+    void setConnectionLostHandler(connectionLostHandlers clh)
+    {
+        connectionLostHandler.attach(clh);
+    }
+    
+    /** Set the default message handling callback - used for any message which does not match a subscription message handler
+     *  @param mh - pointer to the callback function
+     */
+    void setDefaultMessageHandler(messageHandler mh)
+    {
+        defaultMessageHandler.attach(mh);
+    }
            
-    int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
+    int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
     
      template<class T>
-    int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0);  // alternative to pass in pointer to member function
+    int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0);  // alternative to pass in pointer to member function
         
-    int publish(const char* topic, Message* message, resultHandler rh = 0);
-    
-    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
+    int publish(resultHandler rh, const char* topic, Message* message);
     
-    int unsubscribe(const char* topicFilter, resultHandler rh = 0);
+    int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
     
-    int disconnect(int timeout, resultHandler rh = 0);
+    int unsubscribe(resultHandler rh, const char* topicFilter);
     
-	void yield(int timeout);
+    int disconnect(resultHandler rh);
     
 private:
 
@@ -149,19 +171,25 @@
     } *operations;           // result handlers are indexed by packet ids
 
 	static void threadfn(void* arg);
+	
+	messageHandlerFP defaultMessageHandler;
+    
+    typedef FP<int, connectionLostInfo*> connectionLostFP;
+    
+    connectionLostFP connectionLostHandler;
     
 };
 
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::threadfn(void* arg)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
 {
-    ((Client<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
+    ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits limits)  : limits(limits), packetid()
+template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits)  : limits(limits), packetid()
 {
 	this->thread = 0;
 	this->ipstack = network;
@@ -180,7 +208,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
 {
     int sent = 0;
     
@@ -192,7 +220,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -226,7 +254,7 @@
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
 {
     int rc = -1;
     MQTTHeader header = {0};
@@ -253,7 +281,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
 {
 	int rc = -1;
 
@@ -273,17 +301,7 @@
 
 
 
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::yield(int timeout)
-{
-	Timer atimer = Timer();
-	
-	atimer.countdown_ms(timeout);
-	while (!atimer.expired())
-		cycle(atimer.left_ms());
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
 {
     /* get one piece of work off the wire and one pass through */
 
@@ -297,7 +315,7 @@
 			if (this->thread)
 			{
 				Result res = {this, 0};
-            	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+            	if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
                 	;
 				connectHandler(&res);
 				connectHandler.detach(); // only invoke the callback once
@@ -339,7 +357,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
 {
 	int rc = 0;
 
@@ -366,7 +384,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::run(void const *argument)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
 {
 	while (true)
 		cycle(ping_timer.left_ms());
@@ -374,7 +392,7 @@
 
 
 // only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
 {
 	int rc = -1;
 	
@@ -389,9 +407,9 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
 {
-	connect_timer.countdown(limits.command_timeout);
+	connect_timer.countdown(limits.command_timeout_ms);
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
@@ -420,7 +438,7 @@
         connectHandler.attach(resultHandler);
         
         // start background thread            
-        this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
+        this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
     }
     
 exit:
@@ -428,7 +446,7 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::findFreeOperation()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
 {
 	int found = -1;
 	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
@@ -443,14 +461,14 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
 {
 	int index = 0;
 	if (this->thread)
 		index = findFreeOperation();	
 	Timer& atimer = operations[index].timer;
 	
-	atimer.countdown(limits.command_timeout);
+	atimer.countdown(limits.command_timeout_ms);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
@@ -493,14 +511,14 @@
 }
 
 
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
 {
 	int index = 0;
 	if (this->thread)
 		index = findFreeOperation();	
 	Timer& atimer = operations[index].timer;
 
-	atimer.countdown(limits.command_timeout);
+	atimer.countdown(limits.command_timeout_ms);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
@@ -508,22 +526,8 @@
 	if (rc != len) 
 		goto exit; // there was a problem
     
-    /* wait for unsuback */
-    if (resultHandler == 0)
-    {
-        // this will block
-        if (waitfor(UNSUBACK) == UNSUBACK)
-        {
-            int mypacketid;
-            if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-                rc = 0; 
-        }
-    }
-    else
-    {
-        // set unsubscribe response callback function
+    // set unsubscribe response callback function
         
-    }
     
 exit:
     return rc;
@@ -531,14 +535,14 @@
 
 
    
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
 {
 	int index = 0;
 	if (this->thread)
 		index = findFreeOperation();	
 	Timer& atimer = operations[index].timer;
 
-	atimer.countdown(limits.command_timeout);
+	atimer.countdown(limits.command_timeout_ms);
     MQTTString topic = {(char*)topicName, 0, 0};
 
 	if (message->qos == QOS1 || message->qos == QOS2)
@@ -583,4 +587,15 @@
 }
 
 
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
+{  
+    Timer timer = Timer(limits.command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
+    int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
+    int rc = sendPacket(len, timer.left_ms());   // send the disconnect packet
+    
+    return (rc == len) ? 0 : -1;
+}
+
+
+
 #endif
--- a/MQTTClient.h	Mon Apr 28 16:07:51 2014 +0000
+++ b/MQTTClient.h	Tue Apr 29 16:04:55 2014 +0000
@@ -13,6 +13,15 @@
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
  *******************************************************************************/
+ 
+ /*
+ 
+ TODO: 
+ 
+ log messages - use macros
+ define return code constants
+ 
+ */
 
 #if !defined(MQTTCLIENT_H)
 #define MQTTCLIENT_H
@@ -75,6 +84,12 @@
 } Limits;
   
   
+/**
+ * @class Client
+ * @brief blocking, non-threaded MQTT Client API
+ * @param Network a network class which supports send, receive
+ * @param Timer a timer class with the methods: 
+ */ 
 template<class Network, class Timer> class Client
 {
     
@@ -548,7 +563,7 @@
     int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
     int rc = sendPacket(len, timer);   // send the disconnect packet
     
-    return rc;
+    return (rc == len) ? 0 : -1;
 }