An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Mon Apr 14 18:51:52 2014 +0000
Parent:
15:64a57183aa03
Child:
17:4f914fd9ee12
Commit message:
Add subscriptions

Changed in this revision

MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
MQTTPacket.lib Show annotated file Show diff for this revision Revisions of this file
MQTTSingle.h Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.h	Sun Apr 13 22:32:28 2014 +0000
+++ b/MQTTClient.h	Mon Apr 14 18:51:52 2014 +0000
@@ -38,7 +38,6 @@
     size_t payloadlen;
 };
 
-template<class Network, class Timer, class Thread> class Client;
 
 class PacketId
 {
@@ -53,8 +52,25 @@
 };
 
 typedef void (*messageHandler)(Message*);
+
+typedef struct limits
+{
+	int MAX_MQTT_PACKET_SIZE; // 
+	int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
+	int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
+	int command_timeout;
+		
+	limits()
+	{
+		MAX_MQTT_PACKET_SIZE = 100;
+		MAX_MESSAGE_HANDLERS = 5;
+		MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
+		command_timeout = 30;
+	}
+} Limits;
   
-template<class Network, class Timer, class Thread> class Client
+  
+template<class Network, class Timer, class Thread, class Mutex> class Client
 {
     
 public:    
@@ -62,30 +78,14 @@
 	struct Result
 	{
     	/* success or failure result data */
-    	Client<Network, Timer, Thread>* client;
+    	Client<Network, Timer, Thread, Mutex>* client;
 		int connack_rc;
 	};
 
-	typedef void (*resultHandler)(Result*);
-	
-	struct limits
-	{
-		int MAX_MQTT_PACKET_SIZE; // 
-		int MAX_MESSAGE_HANDLERS;  // 5 - each subscription requires a message handler
-		int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
-		int command_timeout;
-		
-		limits()
-		{
-			MAX_MQTT_PACKET_SIZE = 100;
-			MAX_MESSAGE_HANDLERS = 5;
-			MAX_CONCURRENT_OPERATIONS= 5;
-			command_timeout = 30;	
-		}
-	};
+	typedef void (*resultHandler)(Result*);	
    
-    Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); 
-       
+    Client(Network* network, const Limits limits = Limits()); 
+           
     int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
     
      template<class T>
@@ -104,48 +104,48 @@
 private:
 
     int cycle(int timeout);
+    int waitfor(int packet_type, Timer& atimer);
 	int keepalive();
+	int findFreeOperation();
 
     int decodePacket(int* value, int timeout);
-    int readPacket(int timeout = -1);
-    int sendPacket(int length, int timeout = -1);
+    int readPacket(int timeout);
+    int sendPacket(int length, int timeout);
 	int deliverMessage(MQTTString* topic, Message* message);
     
     Thread* thread;
     Network* ipstack;
-    Timer command_timer, ping_timer;
+    
+    Limits limits;
     
-    char buf[];
-    int buflen;
-    
+    char* buf;  
     char* readbuf;
-    int readbuflen;
 
+    Timer ping_timer, connect_timer;
     unsigned int keepAliveInterval;
 	bool ping_outstanding;
     
-    int command_timeout; // max time to wait for any MQTT command to complete, in seconds
     PacketId packetid;
     
     typedef FP<void, Result*> resultHandlerFP;    
     resultHandlerFP connectHandler; 
     
-    #define MAX_MESSAGE_HANDLERS 5
     typedef FP<void, Message*> messageHandlerFP;
-    struct
+    struct MessageHandlers
     {
-    	char* topic;
+    	const char* topic;
     	messageHandlerFP fp;
-    } messageHandlers[MAX_MESSAGE_HANDLERS];  // Message handlers are linked to a subscription topic
+    } *messageHandlers;      // Message handlers are indexed by subscription topic
     
     // how many concurrent operations should we allow?  Each one will require a function pointer
-    struct
+    struct Operations
     {
     	unsigned short id;
     	resultHandlerFP fp;
-    	MQTTString* topic;  // if this is a publish, store topic name in case republishing is required
-    	Message* message;  // for publish, 
-    } *operations;  // result handlers are indexed by packet ids
+    	const char* topic;         // if this is a publish, store topic name in case republishing is required
+    	Message* message;    // for publish, 
+    	Timer timer;         // to check if the command has timed out
+    } *operations;           // result handlers are indexed by packet ids
 
 	static void threadfn(void* arg);
     
@@ -154,40 +154,44 @@
 }
 
 
-template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::threadfn(void* arg)
 {
-    ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+    ((Client<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
 }
 
 
-template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout)  : packetid()
+template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits limits)  : limits(limits), packetid()
 {
-   //buf = new char[MAX_MQTT_PACKET_SIZE];
-   readbuf = new char[MAX_MQTT_PACKET_SIZE];
-   buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
-   
-   this->command_timeout = command_timeout;
-   this->thread = 0;
-   this->ipstack = network;
-   this->command_timer = Timer();
-   this->ping_timer = Timer();
-   this->ping_outstanding = 0;
+	this->thread = 0;
+	this->ipstack = network;
+	this->ping_timer = Timer();
+	this->ping_outstanding = 0;
+	   
+	// How to make these memory allocations portable?  I was hoping to avoid the heap
+	buf = new char[limits.MAX_MQTT_PACKET_SIZE];
+	readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
+	this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS];
+	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
+		operations[i].id = 0;
+	this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
+	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+		messageHandlers[i].topic = 0;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
 {
     int sent = 0;
     
     while (sent < length)
-        sent += ipstack->write(&buf[sent], length, -1);
+        sent += ipstack->write(&buf[sent], length, timeout);
 	if (sent == length)
-	    ping_timer.reset(); // record the fact that we have successfully sent the packet    
+	    ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
     return sent;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
 {
     char c;
     int multiplier = 1;
@@ -221,7 +225,7 @@
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout) 
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
 {
     int rc = -1;
     MQTTHeader header = {0};
@@ -248,12 +252,26 @@
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
 {
+	int rc = -1;
+
+	// we have to find the right message handler - indexed by topic
+	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+	{
+		if (messageHandlers[i].topic && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
+		{
+			messageHandlers[i].fp(message);
+			rc = 0;
+			break;
+		}
+	}
+	
+	return rc;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout)
 {
     /* get one piece of work off the wire and one pass through */
     
@@ -269,29 +287,32 @@
 			if (this->thread)
 			{
 				Result res = {this, 0};
-            	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
+            	if (MQTTDeserialize_connack(&res.connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
                 	;
 				connectHandler(&res);
 				connectHandler.detach(); // only invoke the callback once
 			}
+			break;
         case PUBACK:
+        	if (this->thread)
+        		; //call resultHandler
         case SUBACK:
             break;
         case PUBLISH:
 			MQTTString topicName;
 			Message msg;
 			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);
 			if (msg.qos == QOS0)
 				deliverMessage(&topicName, &msg);
             break;
         case PUBREC:
    	        int type, dup, mypacketid;
-   	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+   	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
    	            ; 
    	        // must lock this access against the application thread, if we are multi-threaded
-			len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
-		    rc = sendPacket(len); // send the subscribe packet
+			len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
+		    rc = sendPacket(len, timeout); // send the PUBREL packet
 			if (rc != len) 
 				goto exit; // there was a problem
 
@@ -308,21 +329,21 @@
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive()
 {
 	int rc = 0;
 
 	if (keepAliveInterval == 0)
 		goto exit;
 
-	if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+	if (ping_timer.expired())
 	{
 		if (ping_outstanding)
 			rc = -1;
 		else
 		{
-			int len = MQTTSerialize_pingreq(buf, buflen);
-			rc = sendPacket(len); // send the connect packet
+			int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
+			rc = sendPacket(len, 1000); // send the ping packet
 			if (rc != len) 
 				rc = -1; // indicate there's a problem
 			else
@@ -335,40 +356,53 @@
 }
 
 
-template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
+template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::run(void const *argument)
 {
 	while (true)
-		cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
+		cycle(ping_timer.left_ms());
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
+// only used in single-threaded mode where one command at a time is in process
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
 {
-	command_timer.start();
+	int rc = -1;
+	
+	do
+    {
+		if (atimer.expired()) 
+			break; // we timed out
+	}
+	while ((rc = cycle(atimer.left_ms())) != packet_type);	
+	
+	return rc;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
+{
+	connect_timer.countdown(limits.command_timeout);
 
     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     if (options == 0)
         options = &default_options; // set default options if none were supplied
     
     this->keepAliveInterval = options->keepAliveInterval;
-	ping_timer.start();
-    int len = MQTTSerialize_connect(buf, buflen, options);
-    int rc = sendPacket(len); // send the connect packet
+	ping_timer.countdown(this->keepAliveInterval);
+    int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
+    int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet
 	if (rc != len) 
 		goto exit; // there was a problem
     
     if (resultHandler == 0)     // wait until the connack is received 
     {
         // this will be a blocking call, wait for the connack
-		do
-        {
-			if (command_timer.read_ms() > (command_timeout * 1000)) 
-				goto exit; // we timed out
-		}
-		while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
-        int connack_rc = -1;
-        if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
-	        rc = connack_rc;
+		if (waitfor(CONNACK, connect_timer) == CONNACK)
+		{
+        	int connack_rc = -1;
+        	if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
+	        	rc = connack_rc;
+	    }
     }
     else
     {
@@ -376,24 +410,41 @@
         connectHandler.attach(resultHandler);
         
         // start background thread            
-        this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
+        this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
     }
     
 exit:
-	command_timer.stop();
-	command_timer.reset();
     return rc;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::findFreeOperation()
 {
-	command_timer.start();
+	int found = -1;
+	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
+	{
+		if (operations[i].id == 0)
+		{
+			found = i;
+			break;
+		}
+	}
+	return found;
+}
 
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+{
+	int index = 0;
+	if (this->thread)
+		index = findFreeOperation();	
+	Timer& atimer = operations[index].timer;
+	
+	atimer.countdown(limits.command_timeout);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
-    int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    int rc = sendPacket(len); // send the subscribe packet
+    int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
 	if (rc != len) 
 		goto exit; // there was a problem
     
@@ -401,11 +452,24 @@
     if (resultHandler == 0)
     {
         // this will block
-        if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
+        if (waitfor(SUBACK, atimer) == SUBACK)
         {
             int count = 0, grantedQoS = -1, mypacketid;
-            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
+            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
                 rc = grantedQoS; // 0, 1, 2 or 0x80 
+            if (rc != 0x80)
+            {
+            	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
+				{
+					if (messageHandlers[i].topic == 0)
+					{
+						messageHandlers[i].topic = topicFilter;
+						messageHandlers[i].fp.attach(messageHandler);
+						rc = 0;
+						break;
+					}
+				}
+            }
         }
     }
     else
@@ -415,28 +479,29 @@
     }
     
 exit:
-	command_timer.stop();
-	command_timer.reset();
     return rc;
 }
 
 
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
 {
-	command_timer.start();
+	int index = 0;
+	if (this->thread)
+		index = findFreeOperation();	
+	Timer& atimer = operations[index].timer;
 
     MQTTString topic = {(char*)topicFilter, 0, 0};
     
     int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
-    int rc = sendPacket(len); // send the subscribe packet
+    int rc = sendPacket(len, atimer); // send the subscribe packet
 	if (rc != len) 
 		goto exit; // there was a problem
     
-    /* wait for suback */
+    /* wait for unsuback */
     if (resultHandler == 0)
     {
         // this will block
-        if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
+        if (waitfor(UNSUBACK) == UNSUBACK)
         {
             int mypacketid;
             if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
@@ -457,7 +522,7 @@
 
 
    
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler)
 {
 	command_timer.start();
 
@@ -475,7 +540,7 @@
     {
  		if (message->qos == QOS1)
 		{
-	        if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
+	        if (waitfor(PUBACK) == PUBACK)
     	    {
     	        int type, dup, mypacketid;
     	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
@@ -484,7 +549,7 @@
 		}
 		else if (message->qos == QOS2)
 		{
-	        if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+	        if (waitfor(PUBCOMP) == PUBCOMP)
 	   	    {
 	   	    	int type, dup, mypacketid;
             	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
--- a/MQTTPacket.lib	Sun Apr 13 22:32:28 2014 +0000
+++ b/MQTTPacket.lib	Mon Apr 14 18:51:52 2014 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/teams/mqtt/code/MQTTPacket/#c502573c6016
+http://mbed.org/teams/mqtt/code/MQTTPacket/#eea71419676a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTSingle.h	Mon Apr 14 18:51:52 2014 +0000
@@ -0,0 +1,428 @@
+/*******************************************************************************
+ * Copyright (c) 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+#if !defined(MQTTSINGLE_H)
+#define MQTTSINGLE_H
+
+#include "FP.h"
+#include "MQTTPacket.h"
+#include "stdio.h"
+
+namespace MQTT
+{
+
+
+enum QoS { QOS0, QOS1, QOS2 };
+
+
+struct Message
+{
+    enum QoS qos;
+    bool retained;
+    bool dup;
+    unsigned short id;
+    void *payload;
+    size_t payloadlen;
+};
+
+
+class PacketId
+{
+public:
+    PacketId();
+    
+    int getNext();
+   
+private:
+    static const int MAX_PACKET_ID = 65535;
+    int next;
+};
+
+typedef void (*messageHandler)(Message*);
+
+typedef struct limits
+{
+    int MAX_MQTT_PACKET_SIZE; // 
+    int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
+    int command_timeout;
+        
+    limits()
+    {
+        MAX_MQTT_PACKET_SIZE = 100;
+        MAX_MESSAGE_HANDLERS = 5;
+        command_timeout = 30;
+    }
+} Limits;
+  
+  
+template<class Network, class Timer> class Client
+{
+    
+public:    
+ 
+    Client(Network* network, Limits* limits = 0);
+       
+    int connect(MQTTPacket_connectData* options = 0);
+        
+    int publish(const char* topic, Message* message);
+    
+    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
+    
+    int unsubscribe(const char* topicFilter);
+    
+    int disconnect(int timeout);
+    
+private:
+
+    int cycle(int timeout);
+    int keepalive();
+
+    int decodePacket(int* value, int timeout);
+    int readPacket(int timeout = -1);
+    int sendPacket(int length, int timeout = -1);
+    int deliverMessage(MQTTString* topic, Message* message);
+    
+    Network* ipstack;
+    Timer command_timer, ping_timer;
+    
+    char* buf;
+    int buflen;
+    
+    char* readbuf;
+    int readbuflen;
+
+    unsigned int keepAliveInterval;
+    bool ping_outstanding;
+    
+    int command_timeout; // max time to wait for any MQTT command to complete, in seconds
+    PacketId packetid;
+    
+    typedef FP<void, Message*> messageHandlerFP;
+    struct MessageHandlers
+    {
+        char* topic;
+        messageHandlerFP fp;
+    } *messageHandlers;  // Message handlers are linked to a subscription topic
+    int messageHandlerCount;
+    
+};
+
+}
+
+template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits* limits)  : packetid()
+{
+    Limits default_limits = Limits();
+   
+    if (limits == 0)
+        limits = &default_limits;
+   
+    this->command_timeout = limits->command_timeout;
+    this->ipstack = network;
+    this->command_timer = Timer();
+    this->ping_timer = Timer();
+    this->ping_outstanding = 0;
+       
+    // How to make these memory allocations portable?  I was hoping to avoid the heap
+    buflen = readbuflen = limits->MAX_MQTT_PACKET_SIZE;
+    buf = new char[limits->MAX_MQTT_PACKET_SIZE];
+    readbuf = new char[limits->MAX_MQTT_PACKET_SIZE];
+    this->messageHandlers = new struct MessageHandlers[limits->MAX_MESSAGE_HANDLERS];
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
+{
+    int sent = 0;
+    
+    while (sent < length)
+        sent += ipstack->write(&buf[sent], length, -1);
+    if (sent == length)
+        ping_timer.reset(); // record the fact that we have successfully sent the packet    
+    return sent;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
+{
+    char c;
+    int multiplier = 1;
+    int len = 0;
+    const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
+
+    *value = 0;
+    do
+    {
+        int rc = MQTTPACKET_READ_ERROR;
+
+        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+        {
+            rc = MQTTPACKET_READ_ERROR; /* bad data */
+            goto exit;
+        }
+        rc = ipstack->read(&c, 1, timeout);
+        if (rc != 1)
+            goto exit;
+        *value += (c & 127) * multiplier;
+        multiplier *= 128;
+    } while ((c & 128) != 0);
+exit:
+    return len;
+}
+
+
+/**
+ * If any read fails in this method, then we should disconnect from the network, as on reconnect
+ * the packets can be retried. 
+ * @param timeout the max time to wait for the packet read to complete, in milliseconds
+ * @return the MQTT packet type, or -1 if none
+ */
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
+{
+    int rc = -1;
+    MQTTHeader header = {0};
+    int len = 0;
+    int rem_len = 0;
+
+    /* 1. read the header byte.  This has the packet type in it */
+    if (ipstack->read(readbuf, 1, timeout) != 1)
+        goto exit;
+
+    len = 1;
+    /* 2. read the remaining length.  This is variable in itself */
+    decodePacket(&rem_len, timeout);
+    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
+
+    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+    if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
+        goto exit;
+
+    header.byte = readbuf[0];
+    rc = header.bits.type;
+exit:
+    return rc;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
+{
+    // we have to find the right message handler - indexed by topic
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout)
+{
+    /* get one piece of work off the wire and one pass through */
+    
+    // read the socket, see what work is due
+    int packet_type = readPacket(timeout);
+    
+    int len, rc;
+    switch (packet_type)
+    {
+        case CONNACK:
+        case PUBACK:
+        case SUBACK:
+            break;
+        case PUBLISH:
+            MQTTString topicName;
+            Message msg;
+            rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+            if (msg.qos == QOS0)
+                deliverMessage(&topicName, &msg);
+            break;
+        case PUBREC:
+            int type, dup, mypacketid;
+            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+                ; 
+            // must lock this access against the application thread, if we are multi-threaded
+            len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
+            rc = sendPacket(len); // send the subscribe packet
+            if (rc != len) 
+                goto exit; // there was a problem
+
+            break;
+        case PUBCOMP:
+            break;
+        case PINGRESP:
+            ping_outstanding = false;
+            break;
+    }
+    keepalive();
+exit:
+    return packet_type;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive()
+{
+    int rc = 0;
+
+    if (keepAliveInterval == 0)
+        goto exit;
+
+    if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+    {
+        if (ping_outstanding)
+            rc = -1;
+        else
+        {
+            int len = MQTTSerialize_pingreq(buf, buflen);
+            rc = sendPacket(len); // send the connect packet
+            if (rc != len) 
+                rc = -1; // indicate there's a problem
+            else
+                ping_outstanding = true;
+        }
+    }
+
+exit:
+    return rc;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
+{
+    command_timer.start();
+
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+    if (options == 0)
+        options = &default_options; // set default options if none were supplied
+    
+    this->keepAliveInterval = options->keepAliveInterval;
+    ping_timer.start();
+    int len = MQTTSerialize_connect(buf, buflen, options);
+    int rc = sendPacket(len); // send the connect packet
+    if (rc != len) 
+        goto exit; // there was a problem
+    
+    // this will be a blocking call, wait for the connack
+    do
+    {
+        if (command_timer.read_ms() > (command_timeout * 1000)) 
+             goto exit; // we timed out
+    }
+    while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
+    int connack_rc = -1;
+    if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
+        rc = connack_rc;
+    
+exit:
+    command_timer.stop();
+    command_timer.reset();
+    return rc;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
+{
+    command_timer.start();
+
+    MQTTString topic = {(char*)topicFilter, 0, 0};
+    
+    int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+    int rc = sendPacket(len); // send the subscribe packet
+    if (rc != len) 
+        goto exit; // there was a problem
+    
+    // wait for suback - this will block
+    if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
+    {
+        int count = 0, grantedQoS = -1, mypacketid;
+        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
+            rc = grantedQoS; // 0, 1, 2 or 0x80 
+        if (rc != 0x80)
+    }
+
+exit:
+    command_timer.stop();
+    command_timer.reset();
+    return rc;
+}
+
+
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+{
+    command_timer.start();
+
+    MQTTString topic = {(char*)topicFilter, 0, 0};
+    
+    int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
+    int rc = sendPacket(len); // send the subscribe packet
+    if (rc != len) 
+        goto exit; // there was a problem
+    
+    // this will block
+    if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
+    {
+        int mypacketid;
+        if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
+            rc = 0; 
+    }
+    
+exit:
+    command_timer.stop();
+    command_timer.reset();
+    return rc;
+}
+
+
+   
+template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+{
+    command_timer.start();
+
+    MQTTString topic = {(char*)topicName, 0, 0};
+
+    message->id = packetid.getNext();
+    
+    int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
+    int rc = sendPacket(len); // send the subscribe packet
+    if (rc != len) 
+        goto exit; // there was a problem
+    
+    /* wait for acks */
+    if (resultHandler == 0)
+    {
+        if (message->qos == QOS1)
+        {
+            if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
+            {
+                int type, dup, mypacketid;
+                if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+                    rc = 0; 
+            }
+        }
+        else if (message->qos == QOS2)
+        {
+            if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+            {
+                int type, dup, mypacketid;
+                if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+                    rc = 0; 
+            }
+
+        }
+    }
+    
+exit:
+    command_timer.stop();
+    command_timer.reset();
+    return rc;
+}
+
+
+#endif