A program for IoT demonstration with mbed, EnOcean and MQTT.

Dependencies:   IBMIoTClientEthernetExample C12832 EthernetInterface MQTT USB400Serial USBHost mbed

Fork of IBMIoTClientEthernetExample by IBM Watson IoT

Files at this revision

API Documentation at this revision

Comitter:
icraggs
Date:
Wed Oct 01 13:27:35 2014 +0000
Parent:
7:63a7aa4deaf8
Child:
9:58eb378727d9
Commit message:
Remove conditional compilation for IBM IoT settings

Changed in this revision

K64F.h Show annotated file Show diff for this revision Revisions of this file
LPC1768.h Show annotated file Show diff for this revision Revisions of this file
MQTT/MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
MQTT/MQTTEthernet.h Show annotated file Show diff for this revision Revisions of this file
MQTT/MQTTLogging.h Show annotated file Show diff for this revision Revisions of this file
MQTT/MQTTSocket.h Show annotated file Show diff for this revision Revisions of this file
MQTT/MQTT_logging.h Show diff for this revision Revisions of this file
MQTT/MQTT_mbed.h Show diff for this revision Revisions of this file
MQTT/MQTTmbed.h Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
--- a/K64F.h	Wed Aug 20 13:46:35 2014 +0000
+++ b/K64F.h	Wed Oct 01 13:27:35 2014 +0000
@@ -12,6 +12,7 @@
  *
  * Contributors:
  *    Ian Craggs - initial implementation
+ *    Sam Grove  - added method to check the status of the Ethernet cable 
  *******************************************************************************/
 
 #if !defined(K64F_H)
@@ -32,4 +33,11 @@
 
 #define DEFAULT_TYPE_NAME "iotsample-mbed-k64f"
 
+//#include "lpc_phy.h"
+// need a wrapper since K64F and LPC1768 wont have the same name for mii read methods
+static uint32_t linkStatus(void)
+{
+    return (1);
+}
+
 #endif
\ No newline at end of file
--- a/LPC1768.h	Wed Aug 20 13:46:35 2014 +0000
+++ b/LPC1768.h	Wed Oct 01 13:27:35 2014 +0000
@@ -12,6 +12,7 @@
  *
  * Contributors:
  *    Ian Craggs - initial implementation
+ *    Sam Grove  - added mehtod to check the status of the Ethernet cable
  *******************************************************************************/
 
 #if !defined(LPC1768_H)
@@ -19,15 +20,29 @@
 
 C12832 lcd(p5, p7, p6, p8, p11);
 DigitalOut led2(LED2);
-PwmOut r(p23); PwmOut g(p24); PwmOut b(p25);
+PwmOut r(p23);
+PwmOut g(p24);
+PwmOut b(p25);
 MMA7660 MMA(p28, p27);
 LM75B sensor(p28, p27);
-DigitalIn Down(p12); DigitalIn Left(p13); DigitalIn Click(p14); DigitalIn Up(p15); DigitalIn Right(p16);
-AnalogIn ain1(p19); AnalogIn ain2(p20);
+DigitalIn Down(p12);
+DigitalIn Left(p13);
+DigitalIn Click(p14);
+DigitalIn Up(p15);
+DigitalIn Right(p16);
+AnalogIn ain1(p19);
+AnalogIn ain2(p20);
 
 #define LED2_OFF 0
 #define LED2_ON 1
 
 #define DEFAULT_TYPE_NAME "iotsample-mbed-lpc1768"
 
+#include "lpc_phy.h"
+// need a wrapper since K64F and LPC1768 wont have the same name for mii read methods
+static uint32_t linkStatus(void)
+{
+    return (lpc_mii_read_data() & 1);
+}
+
 #endif
\ No newline at end of file
--- a/MQTT/MQTTClient.h	Wed Aug 20 13:46:35 2014 +0000
+++ b/MQTT/MQTTClient.h	Wed Oct 01 13:27:35 2014 +0000
@@ -13,14 +13,6 @@
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
  *******************************************************************************/
- 
- /*
- 
- TODO: 
- 
- ensure publish packets are retried on reconnect
- 
- */
 
 #if !defined(MQTTCLIENT_H)
 #define MQTTCLIENT_H
@@ -28,7 +20,14 @@
 #include "FP.h"
 #include "MQTTPacket.h"
 #include "stdio.h"
-#include "MQTT_logging.h"
+#include "MQTTLogging.h"
+
+#if !defined(MQTTCLIENT_QOS1)
+    #define MQTTCLIENT_QOS1 1
+#endif
+#if !defined(MQTTCLIENT_QOS2)
+    #define MQTTCLIENT_QOS2 0
+#endif
 
 namespace MQTT
 {
@@ -55,7 +54,7 @@
 {
     MessageData(MQTTString &aTopicName, struct Message &aMessage)  : message(aMessage), topicName(aTopicName)
     { }
-    
+
     struct Message &message;
     MQTTString &topicName;
 };
@@ -68,44 +67,33 @@
     {
         next = 0;
     }
-    
+
     int getNext()
     {
         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
     }
-   
+
 private:
     static const int MAX_PACKET_ID = 65535;
     int next;
 };
 
 
-class QoS2
-{
-public:
-
-    
-private:
-
-
-};
-  
-  
 /**
  * @class Client
  * @brief blocking, non-threaded MQTT client API
- * 
+ *
  * This version of the API blocks on all method calls, until they are complete.  This means that only one
- * MQTT request can be in process at any one time.  
+ * MQTT request can be in process at any one time.
  * @param Network a network class which supports send, receive
- * @param Timer a timer class with the methods: 
- */ 
+ * @param Timer a timer class with the methods:
+ */
 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
 class Client
 {
-    
+
 public:
-   
+
     typedef void (*messageHandler)(MessageData&);
 
     /** Construct the client
@@ -113,8 +101,8 @@
      *      before calling MQTT connect
      *  @param limits an instance of the Limit class - to alter limits as required
      */
-    Client(Network& network, unsigned int command_timeout_ms = 30000); 
-    
+    Client(Network& network, unsigned int command_timeout_ms = 30000);
+
     /** Set the default message handling callback - used for any message which does not match a subscription message handler
      *  @param mh - pointer to the callback function
      */
@@ -122,95 +110,135 @@
     {
         defaultMessageHandler.attach(mh);
     }
-    
-    void setLogHandler()
-    {
-        logHandler.attach(lh);
-    }
 
+    /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
+     *  The nework object must be connected to the network endpoint before calling this
+     *  Default connect options are used
+     *  @return success code -
+     */
+    int connect();
     
     /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
-     *  The nework object must be connected to the network endpoint before calling this 
+     *  The nework object must be connected to the network endpoint before calling this
      *  @param options - connect options
-     *  @return success code -  
-     */       
-    int connect(MQTTPacket_connectData* options = 0);
-      
+     *  @return success code -
+     */
+    int connect(MQTTPacket_connectData& options);
+
     /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
      *  @param topic - the topic to publish to
      *  @param message - the message to send
-     *  @return success code -  
-     */      
-    int publish(const char* topicName, Message* message);
-   
+     *  @return success code -
+     */
+    int publish(const char* topicName, Message& message);
+    
+    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
+     *  @param topic - the topic to publish to
+     *  @param payload - the data to send
+     *  @param payloadlen - the length of the data
+     *  @param qos - the QoS to send the publish at
+     *  @param retained - whether the message should be retained
+     *  @return success code -
+     */
+    int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
+    
+    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
+     *  @param topic - the topic to publish to
+     *  @param payload - the data to send
+     *  @param payloadlen - the length of the data
+     *  @param id - the packet id used - returned 
+     *  @param qos - the QoS to send the publish at
+     *  @param retained - whether the message should be retained
+     *  @return success code -
+     */
+    int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
+
     /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
      *  @param topicFilter - a topic pattern which can include wildcards
      *  @param qos - the MQTT QoS to subscribe at
      *  @param mh - the callback function to be invoked when a message is received for this subscription
-     *  @return success code -  
-     */   
+     *  @return success code -
+     */
     int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
-    
+
     /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
      *  @param topicFilter - a topic pattern which can include wildcards
-     *  @return success code -  
-     */   
+     *  @return success code -
+     */
     int unsubscribe(const char* topicFilter);
-    
+
     /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
-     *  @return success code -  
+     *  @return success code -
      */
     int disconnect();
-    
+
     /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
-     *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be 
+     *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be
      *  received.
      *  @param timeout_ms the time to wait, in milliseconds
      *  @return success code - on failure, this means the client has disconnected
      */
-    int yield(int timeout_ms = 1000);
-    
+    int yield(unsigned long timeout_ms = 1000L);
+
+    /** Is the client connected?
+     *  @return flag - is the client connected or not?
+     */
+    bool isConnected()
+    {
+        return isconnected;
+    }
+
 private:
 
     int cycle(Timer& timer);
     int waitfor(int packet_type, Timer& timer);
     int keepalive();
+    int publish(int len, Timer& timer, enum QoS qos);
 
     int decodePacket(int* value, int timeout);
     int readPacket(Timer& timer);
     int sendPacket(int length, Timer& timer);
     int deliverMessage(MQTTString& topicName, Message& message);
     bool isTopicMatched(char* topicFilter, MQTTString& topicName);
-    
+
     Network& ipstack;
-    unsigned int command_timeout_ms;
-    
-    unsigned char buf[MAX_MQTT_PACKET_SIZE];  
-    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];  
+    unsigned long command_timeout_ms;
 
-    Timer ping_timer;
+    unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
+    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
+
+    Timer last_sent, last_received;
     unsigned int keepAliveInterval;
     bool ping_outstanding;
-    
+    bool cleansession;
+
     PacketId packetid;
-    
+
     struct MessageHandlers
     {
         const char* topicFilter;
         FP<void, MessageData&> fp;
     } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
-    
+
     FP<void, MessageData&> defaultMessageHandler;
-     
+
     bool isconnected;
-    
-#if 0
-    struct
-    {
-      bool used;
-      int id;  
-    } QoS2messages[MAX_QOS2_MESSAGES];
-    
+
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    unsigned char pubbuf[MAX_MQTT_PACKET_SIZE];  // store the last publish for sending on reconnect
+    int inflightLen;
+    unsigned short inflightMsgid;
+    enum QoS inflightQoS;
+#endif
+
+#if MQTTCLIENT_QOS2
+    bool pubrel;
+    #if !defined(MAX_INCOMING_QOS2_MESSAGES)
+        #define MAX_INCOMING_QOS2_MESSAGES 10
+    #endif
+    unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
+    bool isQoS2msgidFree(unsigned short id);
+    bool useQoS2msgid(unsigned short id);
 #endif
 
 };
@@ -218,45 +246,90 @@
 }
 
 
-template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
 {
-    ping_timer = Timer();
-    ping_outstanding = 0;
+    last_sent = Timer();
+    last_received = Timer();
+    ping_outstanding = false;
     for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
         messageHandlers[i].topicFilter = 0;
-    this->command_timeout_ms = command_timeout_ms; 
+    this->command_timeout_ms = command_timeout_ms;
     isconnected = false;
+    
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    inflightMsgid = 0;
+    inflightQoS = QOS0;
+#endif
+
+    
+#if MQTTCLIENT_QOS2
+    pubrel = false;
+    for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+        incomingQoS2messages[i] = 0;
+#endif
+}
+
+#if MQTTCLIENT_QOS2
+template<class Network, class Timer, int a, int b>
+bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
+{
+    for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+    {
+        if (incomingQoS2messages[i] == id)
+            return false;
+    }
+    return true;
 }
 
 
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
+bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
+{
+    for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
+    {
+        if (incomingQoS2messages[i] == 0)
+        {
+            incomingQoS2messages[i] = id;
+            return true;
+        }
+    }
+    return false;
+}
+#endif
+
+
+template<class Network, class Timer, int a, int b>
 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
 {
-    int rc = FAILURE, 
+    int rc = FAILURE,
         sent = 0;
-    
+
     while (sent < length && !timer.expired())
     {
-        rc = ipstack.write(&buf[sent], length, timer.left_ms());
+        rc = ipstack.write(&sendbuf[sent], length, timer.left_ms());
         if (rc < 0)  // there was an error writing the data
             break;
         sent += rc;
     }
     if (sent == length)
     {
-        ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
+        if (this->keepAliveInterval > 0)
+            last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
         rc = SUCCESS;
-        //if (debug)
-        // Log (packet)
     }
     else
         rc = FAILURE;
+        
+#if defined(MQTT_DEBUG)
+    char printbuf[50];
+    DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
+#endif
     return rc;
 }
 
 
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
 {
     unsigned char c;
@@ -287,12 +360,12 @@
 
 /**
  * If any read fails in this method, then we should disconnect from the network, as on reconnect
- * the packets can be retried. 
+ * the packets can be retried.
  * @param timeout the max time to wait for the packet read to complete, in milliseconds
  * @return the MQTT packet type, or -1 if none
  */
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 
+template<class Network, class Timer, int a, int b>
+int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
 {
     int rc = FAILURE;
     MQTTHeader header = {0};
@@ -306,15 +379,22 @@
     len = 1;
     /* 2. read the remaining length.  This is variable in itself */
     decodePacket(&rem_len, timer.left_ms());
-    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
+    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
 
     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
-    if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
+    if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
         goto exit;
 
     header.byte = readbuf[0];
     rc = header.bits.type;
+    if (this->keepAliveInterval > 0)
+        last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
 exit:
+        
+#if defined(MQTT_DEBUG)
+    char printbuf[50];
+    DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
+#endif
     return rc;
 }
 
@@ -322,13 +402,13 @@
 // assume topic filter and name is in correct format
 // # can only be at end
 // + and # can only be next to separator
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
 {
     char* curf = topicFilter;
     char* curn = topicName.lenstring.data;
     char* curn_end = curn + topicName.lenstring.len;
-    
+
     while (*curf && curn < curn_end)
     {
         if (*curn == '/' && *curf != '/')
@@ -346,13 +426,13 @@
         curf++;
         curn++;
     };
-    
+
     return (curn == curn_end) && (*curf == '\0');
 }
 
 
 
-template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
 {
     int rc = FAILURE;
@@ -371,25 +451,25 @@
             }
         }
     }
-    
-    if (rc == FAILURE && defaultMessageHandler.attached()) 
+
+    if (rc == FAILURE && defaultMessageHandler.attached())
     {
         MessageData md(topicName, message);
         defaultMessageHandler(md);
         rc = SUCCESS;
-    }   
-    
+    }
+
     return rc;
 }
 
 
 
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
+template<class Network, class Timer, int a, int b>
+int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
 {
     int rc = SUCCESS;
     Timer timer = Timer();
-    
+
     timer.countdown_ms(timeout_ms);
     while (!timer.expired())
     {
@@ -399,19 +479,19 @@
             break;
         }
     }
-        
+
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
 {
     /* get one piece of work off the wire and one pass through */
 
     // read the socket, see what work is due
     unsigned short packet_type = readPacket(timer);
-    
+
     int len = 0,
         rc = SUCCESS;
 
@@ -427,21 +507,26 @@
             if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
                                  (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 goto exit;
-//          if (msg.qos != QOS2) 
+#if MQTTCLIENT_QOS2
+            if (msg.qos != QOS2)
+#endif
                 deliverMessage(topicName, msg);
-#if 0
+#if MQTTCLIENT_QOS2
             else if (isQoS2msgidFree(msg.id))
             {
-                UseQoS2msgid(msg.id);
-                deliverMessage(topicName, msg);
-            }
+                if (useQoS2msgid(msg.id))
+                    deliverMessage(topicName, msg);
+                else
+                    WARN("Maximum number of incoming QoS2 messages exceeded");
+            }   
 #endif
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
             if (msg.qos != QOS0)
             {
                 if (msg.qos == QOS1)
-                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
+                    len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
                 else if (msg.qos == QOS2)
-                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
+                    len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
                 if (len <= 0)
                     rc = FAILURE;
                 else
@@ -450,12 +535,14 @@
                     goto exit; // there was a problem
             }
             break;
+#endif
+#if MQTTCLIENT_QOS2
         case PUBREC:
             unsigned short mypacketid;
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
-            else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
+            else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
                 rc = FAILURE;
             else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
                 rc = FAILURE; // there was a problem
@@ -464,6 +551,7 @@
             break;
         case PUBCOMP:
             break;
+#endif
         case PINGRESP:
             ping_outstanding = false;
             break;
@@ -487,12 +575,12 @@
         goto exit;
     }
 
-    if (ping_timer.expired())
+    if (last_sent.expired() || last_received.expired())
     {
         if (!ping_outstanding)
         {
             Timer timer = Timer(1000);
-            int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
+            int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
             if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
                 ping_outstanding = true;
         }
@@ -504,43 +592,41 @@
 
 
 // only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, int a, int b> 
+template<class Network, class Timer, int a, int b>
 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
 {
     int rc = FAILURE;
-    
+
     do
     {
-        if (timer.expired()) 
+        if (timer.expired())
             break; // we timed out
     }
-    while ((rc = cycle(timer)) != packet_type);  
-    
+    while ((rc = cycle(timer)) != packet_type);
+
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
 {
     Timer connect_timer = Timer(command_timeout_ms);
     int rc = FAILURE;
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
     int len = 0;
-    
+
     if (isconnected) // don't send connect packet again if we are already connected
         goto exit;
 
-    if (options == 0)
-        options = &default_options; // set default options if none were supplied
-    
-    this->keepAliveInterval = options->keepAliveInterval;
-    ping_timer.countdown(this->keepAliveInterval);
-    if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
+    this->keepAliveInterval = options.keepAliveInterval;
+    this->cleansession = options.cleansession;
+    if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
         goto exit;
     if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
         goto exit; // there was a problem
-    
+
+    if (this->keepAliveInterval > 0)
+        last_received.countdown(this->keepAliveInterval);
     // this will be a blocking call, wait for the connack
     if (waitfor(CONNACK, connect_timer) == CONNACK)
     {
@@ -553,7 +639,26 @@
     }
     else
         rc = FAILURE;
-    
+        
+#if MQTTCLIENT_QOS2
+    // resend an inflight publish
+    if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
+    {
+        if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
+            rc = FAILURE;
+        else
+            rc = publish(len, connect_timer, inflightQoS);
+    }
+    else
+#endif
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    if (inflightMsgid > 0)
+    {
+        memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
+        rc = publish(inflightLen, connect_timer, inflightQoS);
+    }
+#endif
+
 exit:
     if (rc == SUCCESS)
         isconnected = true;
@@ -561,29 +666,37 @@
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
+{
+    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+    return connect(default_options);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
-{ 
-    int rc = FAILURE;  
+{
+    int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);
     int len = 0;
     MQTTString topic = {(char*)topicFilter, 0, 0};
-    
+
     if (!isconnected)
         goto exit;
-    
-    len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+
+    len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
     if (len <= 0)
         goto exit;
     if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
         goto exit;             // there was a problem
-    
-    if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback 
+
+    if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback
     {
         int count = 0, grantedQoS = -1;
         unsigned short mypacketid;
         if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = grantedQoS; // 0, 1, 2 or 0x80 
+            rc = grantedQoS; // 0, 1, 2 or 0x80
         if (rc != 0x80)
         {
             for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
@@ -598,67 +711,58 @@
             }
         }
     }
-    else 
+    else
         rc = FAILURE;
-        
+
 exit:
+    if (rc != SUCCESS)
+        isconnected = false;
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
-{   
+{
     int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);    
+    Timer timer = Timer(command_timeout_ms);
     MQTTString topic = {(char*)topicFilter, 0, 0};
     int len = 0;
-    
+
     if (!isconnected)
         goto exit;
-    
-    if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
+
+    if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
         goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
         goto exit; // there was a problem
-    
+
     if (waitfor(UNSUBACK, timer) == UNSUBACK)
     {
         unsigned short mypacketid;  // should be the same as the packetid above
         if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = 0; 
+            rc = 0;
     }
     else
         rc = FAILURE;
-    
+
 exit:
+    if (rc != SUCCESS)
+        isconnected = false;
     return rc;
 }
 
 
-   
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
 {
-    int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);   
-    MQTTString topicString = {(char*)topicName, 0, 0};
-    int len = 0;
+    int rc;
     
-    if (!isconnected)
-        goto exit;
+    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
+        goto exit; // there was a problem
 
-    if (message->qos == QOS1 || message->qos == QOS2)
-        message->id = packetid.getNext();
-    
-    len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
-              topicString, (unsigned char*)message->payload, message->payloadlen);
-    if (len <= 0)
-        goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
-        goto exit; // there was a problem
-    
-    if (message->qos == QOS1)
+#if MQTTCLIENT_QOS1 
+    if (qos == QOS1)
     {
         if (waitfor(PUBACK, timer) == PUBACK)
         {
@@ -666,11 +770,14 @@
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
+            else if (inflightMsgid == mypacketid)
+                inflightMsgid = 0;
         }
         else
             rc = FAILURE;
     }
-    else if (message->qos == QOS2)
+#elif MQTTCLIENT_QOS2
+    else if (qos == QOS2)
     {
         if (waitfor(PUBCOMP, timer) == PUBCOMP)
         {
@@ -678,28 +785,91 @@
             unsigned char dup, type;
             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
                 rc = FAILURE;
+            else if (inflightMsgid == mypacketid)
+                inflightMsgid = 0;
         }
         else
             rc = FAILURE;
     }
-    
+#endif
+
+exit:
+    if (rc != SUCCESS)
+        isconnected = false;
+    return rc;
+}
+
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
+{
+    int rc = FAILURE;
+    Timer timer = Timer(command_timeout_ms);
+    MQTTString topicString = MQTTString_initializer;
+    int len = 0;
+
+    if (!isconnected)
+        goto exit;
+        
+    topicString.cstring = (char*)topicName;
+
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    if (qos == QOS1 || qos == QOS2)
+        id = packetid.getNext();
+#endif
+
+    len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
+              topicString, (unsigned char*)payload, payloadlen);
+    if (len <= 0)
+        goto exit;
+        
+#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
+    if (!cleansession)
+    {
+        memcpy(pubbuf, sendbuf, len);
+        inflightMsgid = id;
+        inflightLen = len;
+        inflightQoS = qos;
+#if MQTTCLIENT_QOS2
+        pubrel = false;
+#endif
+    }
+#endif
+        
+    rc = publish(len, timer, qos);
 exit:
     return rc;
 }
 
 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
+{
+    unsigned short id = 0;  // dummy - not used for anything
+    return publish(topicName, payload, payloadlen, id, qos, retained);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
+int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
+{
+    return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
+}
+
+
+template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
-{  
+{
     int rc = FAILURE;
     Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
     int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
     if (len > 0)
         rc = sendPacket(len, timer);            // send the disconnect packet
-        
+
     isconnected = false;
     return rc;
 }
 
 
-#endif
\ No newline at end of file
+#endif
--- a/MQTT/MQTTEthernet.h	Wed Aug 20 13:46:35 2014 +0000
+++ b/MQTT/MQTTEthernet.h	Wed Oct 01 13:27:35 2014 +0000
@@ -2,7 +2,7 @@
 #if !defined(MQTTETHERNET_H)
 #define MQTTETHERNET_H
 
-#include "MQTT_mbed.h"
+#include "MQTTmbed.h"
 #include "EthernetInterface.h"
 #include "MQTTSocket.h"
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTT/MQTTLogging.h	Wed Oct 01 13:27:35 2014 +0000
@@ -0,0 +1,39 @@
+#if !defined(MQTT_LOGGING_H)
+#define MQTT_LOGGING_H
+
+#define STREAM      stdout
+#if !defined(DEBUG)
+#define DEBUG(...)    \
+    {\
+    fprintf(STREAM, "DEBUG:   %s L#%d ", __PRETTY_FUNCTION__, __LINE__);  \
+    fprintf(STREAM, ##__VA_ARGS__); \
+    fflush(STREAM); \
+    }
+#endif
+#if !defined(LOG)
+#define LOG(...)    \
+    {\
+    fprintf(STREAM, "LOG:   %s L#%d ", __PRETTY_FUNCTION__, __LINE__);  \
+    fprintf(STREAM, ##__VA_ARGS__); \
+    fflush(STREAM); \
+    }
+#endif
+#if !defined(WARN)
+#define WARN(...)   \
+    { \
+    fprintf(STREAM, "WARN:  %s L#%d ", __PRETTY_FUNCTION__, __LINE__);  \
+    fprintf(STREAM, ##__VA_ARGS__); \
+    fflush(STREAM); \
+    }
+#endif 
+#if !defined(ERROR)
+#define ERROR(...)  \
+    { \
+    fprintf(STREAM, "ERROR: %s L#%d ", __PRETTY_FUNCTION__, __LINE__); \
+    fprintf(STREAM, ##__VA_ARGS__); \
+    fflush(STREAM); \
+    exit(1); \
+    }
+#endif
+
+#endif
\ No newline at end of file
--- a/MQTT/MQTTSocket.h	Wed Aug 20 13:46:35 2014 +0000
+++ b/MQTT/MQTTSocket.h	Wed Oct 01 13:27:35 2014 +0000
@@ -1,7 +1,7 @@
 #if !defined(MQTTSOCKET_H)
 #define MQTTSOCKET_H
 
-#include "MQTT_mbed.h"
+#include "MQTTmbed.h"
 #include "TCPSocketConnection.h"
 
 class MQTTSocket
--- a/MQTT/MQTT_logging.h	Wed Aug 20 13:46:35 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,39 +0,0 @@
-#if !defined(MQTT_LOGGING_H)
-#define MQTT_LOGGING_H
-
-#define STREAM      stdout
-#if !defined(DEBUG)
-#define DEBUG(...)    \
-    {\
-    fprintf(STREAM, "DEBUG:   %s L#%d ", __PRETTY_FUNCTION__, __LINE__);  \
-    fprintf(STREAM, ##__VA_ARGS__); \
-    fflush(STREAM); \
-    }
-#endif
-#if !defined(LOG)
-#define LOG(...)    \
-    {\
-    fprintf(STREAM, "LOG:   %s L#%d ", __PRETTY_FUNCTION__, __LINE__);  \
-    fprintf(STREAM, ##__VA_ARGS__); \
-    fflush(STREAM); \
-    }
-#endif
-#if !defined(WARN)
-#define WARN(...)   \
-    { \
-    fprintf(STREAM, "WARN:  %s L#%d ", __PRETTY_FUNCTION__, __LINE__);  \
-    fprintf(STREAM, ##__VA_ARGS__); \
-    fflush(STREAM); \
-    }
-#endif 
-#if !defined(ERROR)
-#define ERROR(...)  \
-    { \
-    fprintf(STREAM, "ERROR: %s L#%d ", __PRETTY_FUNCTION__, __LINE__); \
-    fprintf(STREAM, ##__VA_ARGS__); \
-    fflush(STREAM); \
-    exit(1); \
-    }
-#endif
-
-#endif
\ No newline at end of file
--- a/MQTT/MQTT_mbed.h	Wed Aug 20 13:46:35 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,49 +0,0 @@
-#if !defined(MQTT_MBED_H)
-#define MQTT_MBED_H
-
-#include "mbed.h"
-
-class Countdown
-{
-public:
-    Countdown()
-    {
-        t = Timer();   
-    }
-    
-    Countdown(int ms)
-    {
-        t = Timer();
-        countdown_ms(ms);   
-    }
-    
-    
-    bool expired()
-    {
-        return t.read_ms() >= interval_end_ms;
-    }
-    
-    void countdown_ms(int ms)  
-    {
-        t.stop();
-        interval_end_ms = ms;
-        t.reset();
-        t.start();
-    }
-    
-    void countdown(int seconds)
-    {
-        countdown_ms(seconds * 1000);
-    }
-    
-    int left_ms()
-    {
-        return interval_end_ms - t.read_ms();
-    }
-    
-private:
-    Timer t;
-    int interval_end_ms; 
-};
-
-#endif
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTT/MQTTmbed.h	Wed Oct 01 13:27:35 2014 +0000
@@ -0,0 +1,49 @@
+#if !defined(MQTT_MBED_H)
+#define MQTT_MBED_H
+
+#include "mbed.h"
+
+class Countdown
+{
+public:
+    Countdown()
+    {
+        t = Timer();   
+    }
+    
+    Countdown(int ms)
+    {
+        t = Timer();
+        countdown_ms(ms);   
+    }
+    
+    
+    bool expired()
+    {
+        return t.read_ms() >= interval_end_ms;
+    }
+    
+    void countdown_ms(unsigned long ms)  
+    {
+        t.stop();
+        interval_end_ms = ms;
+        t.reset();
+        t.start();
+    }
+    
+    void countdown(int seconds)
+    {
+        countdown_ms((unsigned long)seconds * 1000L);
+    }
+    
+    int left_ms()
+    {
+        return interval_end_ms - t.read_ms();
+    }
+    
+private:
+    Timer t;
+    unsigned long interval_end_ms; 
+};
+
+#endif
\ No newline at end of file
--- a/main.cpp	Wed Aug 20 13:46:35 2014 +0000
+++ b/main.cpp	Wed Oct 01 13:27:35 2014 +0000
@@ -13,6 +13,7 @@
  * Contributors:
  *    Sam Danbury - initial implementation
  *    Ian Craggs - refactoring to remove STL and other changes
+ *    Sam Grove  - added check for Ethernet cable.
  *******************************************************************************/
 
 #include "LM75B.h"
@@ -24,20 +25,10 @@
 #include "rtos.h"
 
 // Configuration values needed to connect to IBM IoT Cloud
-#define QUICKSTARTMODE 1
-#if (QUICKSTARTMODE)
-#define ORG "quickstart"
-#define ID ""
-#define AUTH_METHOD ""
-#define AUTH_TOKEN ""
-#define TYPE DEFAULT_TYPE_NAME
-#else
-#define ORG "Replace with your org"
-#define ID "Replace with your id"
-#define TYPE "Replace with your type"
-#define AUTH_METHOD "Replace with your auth-method"
-#define AUTH_TOKEN "Replace with your auth-token"
-#endif
+#define ORG "quickstart"             // For a registered connection, replace with your org
+#define ID ""                        // For a registered connection, replace with your id
+#define AUTH_TOKEN ""                // For a registered connection, replace with your auth-token
+#define TYPE DEFAULT_TYPE_NAME       // For a registered connection, replace with your type
 
 #define MQTT_PORT 1883
 #define MQTT_TLS_PORT 8883
@@ -53,7 +44,7 @@
 #include "K64F.h"
 #endif
 
-bool quickstartMode = (QUICKSTARTMODE) ? true : false;
+bool quickstartMode = true;
 char org[11] = ORG;  
 char type[30] = TYPE;
 char id[30] = ID;                 // mac without colons
@@ -210,7 +201,7 @@
         data.password.cstring = auth_token;
     }
     
-    if ((rc = client->connect(&data)) == 0) 
+    if ((rc = client->connect(data)) == 0) 
     {       
         connected = true;
         green();    
@@ -233,6 +224,12 @@
 {
     int retryAttempt = 0;
     connected = false;
+    
+    // make sure a cable is connected before starting to connect
+    while (!linkStatus()) {
+        wait(1.0f);
+        WARN("Ethernet link not present. Check cable connection\n");
+    }
         
     while (connect(client, ipstack) != 0) 
     {    
@@ -243,7 +240,15 @@
 #endif
         int timeout = getConnTimeout(++retryAttempt);
         WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
-        wait(timeout);
+        
+        // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
+        //  or maybe just add the proper members to do this disconnect and call attemptConnect(...)
+        
+        // this works - reset the system when the retry count gets to a threshold
+        if (retryAttempt == 5)
+            NVIC_SystemReset();
+        else
+            wait(timeout);
     }
 }
 
@@ -264,7 +269,7 @@
     message.payloadlen = strlen(buf);
     
     LOG("Publishing %s\n", buf);
-    return client->publish(pubTopic, &message);
+    return client->publish(pubTopic, message);
 }
 
 
@@ -339,6 +344,8 @@
 
 int main()
 {    
+    quickstartMode = (strcmp(org, "quickstart") == 0);
+
     lcd.set_font((unsigned char*) Arial12x12);  // Set a nice font for the LCD screen
     
     led2 = LED2_OFF; // K64F: turn off the main board LED