MQTT cellular example

Dependencies:   C027_Support C12832 MQTT mbed

Fork of Cellular_HelloMQTT by Michael Ammann

Files at this revision

API Documentation at this revision

Comitter:
mazgch
Date:
Tue May 27 09:23:07 2014 +0000
Parent:
13:500c2e7a57d6
Child:
15:710f99b36ff2
Commit message:
pull latest libs (use new API)

Changed in this revision

C027_Support.lib Show annotated file Show diff for this revision Revisions of this file
MQTT.lib Show annotated file Show diff for this revision Revisions of this file
MQTT/FP.lib Show diff for this revision Revisions of this file
MQTT/IPStack/LinuxIPStack.h Show diff for this revision Revisions of this file
MQTT/IPStack/MbedIPStack.h Show diff for this revision Revisions of this file
MQTT/MQTTAsync.h Show diff for this revision Revisions of this file
MQTT/MQTTClient.h Show diff for this revision Revisions of this file
MQTT/MQTTPacket.lib Show diff for this revision Revisions of this file
UbloxLib.lib Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
mbed-rtos.lib Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/C027_Support.lib	Tue May 27 09:23:07 2014 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/teams/ublox/code/C027_Support/#34985b4d821e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTT.lib	Tue May 27 09:23:07 2014 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/teams/mqtt/code/MQTT/#a51dd239b78e
--- a/MQTT/FP.lib	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-http://mbed.org/users/sam_grove/code/FP/#e0f19cdaa46e
--- a/MQTT/IPStack/MbedIPStack.h	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,86 +0,0 @@
-#include "TCPSocketConnection.h"
-
-#ifndef MBEDIPSTACK_H
-#define MBEDIPSTACK_H
-
-class IPStack 
-{
-public:    
-    IPStack()
-    {
-        mysock.set_blocking(false, 1000);    // 1 second Timeout 
-    }
-    
-    int connect(char* hostname, int port)
-    {
-        return mysock.connect(hostname, port);
-    }
-
-    int read(char* buffer, int len, int timeout)
-    {
-        mysock.set_blocking(false, timeout);  
-        return mysock.receive(buffer, len);
-    }
-    
-    int write(char* buffer, int len, int timeout)
-    {
-        mysock.set_blocking(false, timeout);  
-        return mysock.send(buffer, len);
-    }
-    
-    int disconnect()
-    {
-        return mysock.close();
-    }
-    
-private:
-
-    TCPSocketConnection mysock; 
-    
-};
-
-
-class Countdown
-{
-public:
-    Countdown()
-    {
-        t = Timer();   
-    }
-    
-    Countdown(int ms)
-    {
-        t = Timer();
-        countdown_ms(ms);   
-    }
-    
-    
-    bool expired()
-    {
-        return t.read_ms() >= interval_end_ms;
-    }
-    
-    void countdown_ms(int ms)  
-    {
-        t.stop();
-        interval_end_ms = ms;
-        t.reset();
-        t.start();
-    }
-    
-    void countdown(int seconds)
-    {
-        countdown_ms(seconds * 1000);
-    }
-    
-    int left_ms()
-    {
-        return interval_end_ms - t.read_ms();
-    }
-    
-private:
-    Timer t;
-    int interval_end_ms; 
-};
-
-#endif
--- a/MQTT/MQTTAsync.h	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,607 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2014 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- *    http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- *   http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- *    Ian Craggs - initial API and implementation and/or initial documentation
- *******************************************************************************/
-
-#if !defined(MQTTASYNC_H)
-#define MQTTASYNC_H
-
-#include "FP.h"
-#include "MQTTPacket.h"
-#include "stdio.h"
-
-namespace MQTT
-{
-
-
-enum QoS { QOS0, QOS1, QOS2 };
-
-
-struct Message
-{
-    enum QoS qos;
-    bool retained;
-    bool dup;
-    unsigned short id;
-    void *payload;
-    size_t payloadlen;
-};
-
-
-class PacketId
-{
-public:
-    PacketId();
-    
-    int getNext();
-   
-private:
-    static const int MAX_PACKET_ID = 65535;
-    int next;
-};
-
-typedef void (*messageHandler)(Message*);
-
-typedef struct limits
-{
-	int MAX_MQTT_PACKET_SIZE; // 
-	int MAX_MESSAGE_HANDLERS;  // each subscription requires a message handler
-	int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
-	int command_timeout_ms;
-		
-	limits()
-	{
-		MAX_MQTT_PACKET_SIZE = 100;
-		MAX_MESSAGE_HANDLERS = 5;
-		MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
-		command_timeout_ms = 30000;
-	}
-} Limits;
-  
-
-/**
- * @class Async
- * @brief non-blocking, threaded MQTT client API
- * @param Network a network class which supports send, receive
- * @param Timer a timer class with the methods: 
- */ 
-template<class Network, class Timer, class Thread, class Mutex> class Async
-{
-    
-public:    
-
-	struct Result
-	{
-    	/* success or failure result data */
-    	Async<Network, Timer, Thread, Mutex>* client;
-		int rc;
-	};
-
-	typedef void (*resultHandler)(Result*);	
-   
-    Async(Network* network, const Limits limits = Limits()); 
-        
-    typedef struct
-    {
-        Async* client;
-        Network* network;
-    } connectionLostInfo;
-    
-    typedef int (*connectionLostHandlers)(connectionLostInfo*);
-    
-    /** Set the connection lost callback - called whenever the connection is lost and we should be connected
-     *  @param clh - pointer to the callback function
-     */
-    void setConnectionLostHandler(connectionLostHandlers clh)
-    {
-        connectionLostHandler.attach(clh);
-    }
-    
-    /** Set the default message handling callback - used for any message which does not match a subscription message handler
-     *  @param mh - pointer to the callback function
-     */
-    void setDefaultMessageHandler(messageHandler mh)
-    {
-        defaultMessageHandler.attach(mh);
-    }
-           
-    int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
-    
-     template<class T>
-    int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0);  // alternative to pass in pointer to member function
-        
-    int publish(resultHandler rh, const char* topic, Message* message);
-    
-    int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
-    
-    int unsubscribe(resultHandler rh, const char* topicFilter);
-    
-    int disconnect(resultHandler rh);
-    
-private:
-
-    void run(void const *argument);
-    int cycle(int timeout);
-    int waitfor(int packet_type, Timer& atimer);
-	int keepalive();
-	int findFreeOperation();
-
-    int decodePacket(int* value, int timeout);
-    int readPacket(int timeout);
-    int sendPacket(int length, int timeout);
-	int deliverMessage(MQTTString* topic, Message* message);
-    
-    Thread* thread;
-    Network* ipstack;
-    
-    Limits limits;
-    
-    char* buf;  
-    char* readbuf;
-
-    Timer ping_timer, connect_timer;
-    unsigned int keepAliveInterval;
-	bool ping_outstanding;
-    
-    PacketId packetid;
-    
-    typedef FP<void, Result*> resultHandlerFP;    
-    resultHandlerFP connectHandler; 
-    
-    typedef FP<void, Message*> messageHandlerFP;
-    struct MessageHandlers
-    {
-    	const char* topic;
-    	messageHandlerFP fp;
-    } *messageHandlers;      // Message handlers are indexed by subscription topic
-    
-    // how many concurrent operations should we allow?  Each one will require a function pointer
-    struct Operations
-    {
-    	unsigned short id;
-    	resultHandlerFP fp;
-    	const char* topic;         // if this is a publish, store topic name in case republishing is required
-    	Message* message;    // for publish, 
-    	Timer timer;         // to check if the command has timed out
-    } *operations;           // result handlers are indexed by packet ids
-
-	static void threadfn(void* arg);
-	
-	messageHandlerFP defaultMessageHandler;
-    
-    typedef FP<int, connectionLostInfo*> connectionLostFP;
-    
-    connectionLostFP connectionLostHandler;
-    
-};
-
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
-{
-    ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits)  : limits(limits), packetid()
-{
-	this->thread = 0;
-	this->ipstack = network;
-	this->ping_timer = Timer();
-	this->ping_outstanding = 0;
-	   
-	// How to make these memory allocations portable?  I was hoping to avoid the heap
-	buf = new char[limits.MAX_MQTT_PACKET_SIZE];
-	readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
-	this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS];
-	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
-		operations[i].id = 0;
-	this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
-	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
-		messageHandlers[i].topic = 0;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
-{
-    int sent = 0;
-    
-    while (sent < length)
-        sent += ipstack->write(&buf[sent], length, timeout);
-	if (sent == length)
-	    ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
-    return sent;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
-{
-    char c;
-    int multiplier = 1;
-    int len = 0;
-	const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
-
-    *value = 0;
-    do
-    {
-        int rc = MQTTPACKET_READ_ERROR;
-
-        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
-        {
-            rc = MQTTPACKET_READ_ERROR; /* bad data */
-            goto exit;
-        }
-        rc = ipstack->read(&c, 1, timeout);
-        if (rc != 1)
-            goto exit;
-        *value += (c & 127) * multiplier;
-        multiplier *= 128;
-    } while ((c & 128) != 0);
-exit:
-    return len;
-}
-
-
-/**
- * If any read fails in this method, then we should disconnect from the network, as on reconnect
- * the packets can be retried. 
- * @param timeout the max time to wait for the packet read to complete, in milliseconds
- * @return the MQTT packet type, or -1 if none
- */
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 
-{
-    int rc = -1;
-    MQTTHeader header = {0};
-    int len = 0;
-    int rem_len = 0;
-
-    /* 1. read the header byte.  This has the packet type in it */
-    if (ipstack->read(readbuf, 1, timeout) != 1)
-        goto exit;
-
-    len = 1;
-    /* 2. read the remaining length.  This is variable in itself */
-    decodePacket(&rem_len, timeout);
-    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
-
-    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
-    if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
-        goto exit;
-
-    header.byte = readbuf[0];
-    rc = header.bits.type;
-exit:
-    return rc;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
-{
-	int rc = -1;
-
-	// we have to find the right message handler - indexed by topic
-	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
-	{
-		if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
-		{
-			messageHandlers[i].fp(message);
-			rc = 0;
-			break;
-		}
-	}
-	
-	return rc;
-}
-
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
-{
-    /* get one piece of work off the wire and one pass through */
-
-    // read the socket, see what work is due
-    int packet_type = readPacket(timeout);
-    
-	int len, rc;
-    switch (packet_type)
-    {
-        case CONNACK:
-			if (this->thread)
-			{
-				Result res = {this, 0};
-            	if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-                	;
-				connectHandler(&res);
-				connectHandler.detach(); // only invoke the callback once
-			}
-			break;
-        case PUBACK:
-        	if (this->thread)
-        		; //call resultHandler
-        case SUBACK:
-            break;
-        case PUBLISH:
-			MQTTString topicName;
-			Message msg;
-			rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-								 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
-			if (msg.qos == QOS0)
-				deliverMessage(&topicName, &msg);
-            break;
-        case PUBREC:
-   	        int type, dup, mypacketid;
-   	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-   	            ; 
-   	        // must lock this access against the application thread, if we are multi-threaded
-			len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
-		    rc = sendPacket(len, timeout); // send the PUBREL packet
-			if (rc != len) 
-				goto exit; // there was a problem
-
-            break;
-        case PUBCOMP:
-            break;
-        case PINGRESP:
-			ping_outstanding = false;
-            break;
-    }
-	keepalive();
-exit:
-    return packet_type;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
-{
-	int rc = 0;
-
-	if (keepAliveInterval == 0)
-		goto exit;
-
-	if (ping_timer.expired())
-	{
-		if (ping_outstanding)
-			rc = -1;
-		else
-		{
-			int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
-			rc = sendPacket(len, 1000); // send the ping packet
-			if (rc != len) 
-				rc = -1; // indicate there's a problem
-			else
-				ping_outstanding = true;
-		}
-	}
-
-exit:
-	return rc;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
-{
-	while (true)
-		cycle(ping_timer.left_ms());
-}
-
-
-// only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
-{
-	int rc = -1;
-	
-	do
-    {
-		if (atimer.expired()) 
-			break; // we timed out
-	}
-	while ((rc = cycle(atimer.left_ms())) != packet_type);	
-	
-	return rc;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
-{
-	connect_timer.countdown(limits.command_timeout_ms);
-
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
-    if (options == 0)
-        options = &default_options; // set default options if none were supplied
-    
-    this->keepAliveInterval = options->keepAliveInterval;
-	ping_timer.countdown(this->keepAliveInterval);
-    int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
-    int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet
-	if (rc != len) 
-		goto exit; // there was a problem
-    
-    if (resultHandler == 0)     // wait until the connack is received 
-    {
-        // this will be a blocking call, wait for the connack
-		if (waitfor(CONNACK, connect_timer) == CONNACK)
-		{
-        	int connack_rc = -1;
-        	if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-	        	rc = connack_rc;
-	    }
-    }
-    else
-    {
-        // set connect response callback function
-        connectHandler.attach(resultHandler);
-        
-        // start background thread            
-        this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
-    }
-    
-exit:
-    return rc;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
-{
-	int found = -1;
-	for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
-	{
-		if (operations[i].id == 0)
-		{
-			found = i;
-			break;
-		}
-	}
-	return found;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
-{
-	int index = 0;
-	if (this->thread)
-		index = findFreeOperation();	
-	Timer& atimer = operations[index].timer;
-	
-	atimer.countdown(limits.command_timeout_ms);
-    MQTTString topic = {(char*)topicFilter, 0, 0};
-    
-    int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
-	if (rc != len) 
-		goto exit; // there was a problem
-    
-    /* wait for suback */
-    if (resultHandler == 0)
-    {
-        // this will block
-        if (waitfor(SUBACK, atimer) == SUBACK)
-        {
-            int count = 0, grantedQoS = -1, mypacketid;
-            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-                rc = grantedQoS; // 0, 1, 2 or 0x80 
-            if (rc != 0x80)
-            {
-            	for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
-				{
-					if (messageHandlers[i].topic == 0)
-					{
-						messageHandlers[i].topic = topicFilter;
-						messageHandlers[i].fp.attach(messageHandler);
-						rc = 0;
-						break;
-					}
-				}
-            }
-        }
-    }
-    else
-    {
-        // set subscribe response callback function
-        
-    }
-    
-exit:
-    return rc;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
-{
-	int index = 0;
-	if (this->thread)
-		index = findFreeOperation();	
-	Timer& atimer = operations[index].timer;
-
-	atimer.countdown(limits.command_timeout_ms);
-    MQTTString topic = {(char*)topicFilter, 0, 0};
-    
-    int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
-    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
-	if (rc != len) 
-		goto exit; // there was a problem
-    
-    // set unsubscribe response callback function
-        
-    
-exit:
-    return rc;
-}
-
-
-   
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
-{
-	int index = 0;
-	if (this->thread)
-		index = findFreeOperation();	
-	Timer& atimer = operations[index].timer;
-
-	atimer.countdown(limits.command_timeout_ms);
-    MQTTString topic = {(char*)topicName, 0, 0};
-
-	if (message->qos == QOS1 || message->qos == QOS2)
-		message->id = packetid.getNext();
-    
-	int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen);
-    int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
-	if (rc != len) 
-		goto exit; // there was a problem
-    
-    /* wait for acks */
-    if (resultHandler == 0)
-    {
- 		if (message->qos == QOS1)
-		{
-	        if (waitfor(PUBACK, atimer) == PUBACK)
-    	    {
-    	        int type, dup, mypacketid;
-    	        if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-    	            rc = 0; 
-    	    }
-		}
-		else if (message->qos == QOS2)
-		{
-	        if (waitfor(PUBCOMP, atimer) == PUBCOMP)
-	   	    {
-	   	    	int type, dup, mypacketid;
-            	if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
-    	           	rc = 0; 
-			}
-
-		}
-    }
-    else
-    {
-        // set publish response callback function
-        
-    }
-    
-exit:
-    return rc;
-}
-
-
-template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
-{  
-    Timer timer = Timer(limits.command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
-    int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
-    int rc = sendPacket(len, timer.left_ms());   // send the disconnect packet
-    
-    return (rc == len) ? 0 : -1;
-}
-
-
-
-#endif
\ No newline at end of file
--- a/MQTT/MQTTClient.h	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,653 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2014 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- *    http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- *   http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- *    Ian Craggs - initial API and implementation and/or initial documentation
- *******************************************************************************/
- 
- /*
- 
- TODO: 
- 
- ensure publish packets are retried on reconnect
- 
- updating usage of FP. Try to remove inclusion of FP.cpp in main. sg-
- 
- */
-
-#if !defined(MQTTCLIENT_H)
-#define MQTTCLIENT_H
-
-#include "FP.h"
-#include "MQTTPacket.h"
-#include "stdio.h"
-
-namespace MQTT
-{
-
-
-enum QoS { QOS0, QOS1, QOS2 };
-
-enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
-
-
-struct Message
-{
-    enum QoS qos;
-    bool retained;
-    bool dup;
-    unsigned short id;
-    void *payload;
-    size_t payloadlen;
-};
-
-
-struct MessageData
-{
-    struct Message message;
-    char* topicName;
-};
-
-
-class PacketId
-{
-public:
-    PacketId()
-    {
-        next = 0;
-    }
-    
-    int getNext()
-    {
-        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
-    }
-   
-private:
-    static const int MAX_PACKET_ID = 65535;
-    int next;
-};
-  
-  
-/**
- * @class Client
- * @brief blocking, non-threaded MQTT client API
- * 
- * This version of the API blocks on all method calls, until they are complete.  This means that only one
- * MQTT request can be in process at any one time.  
- * @param Network a network class which supports send, receive
- * @param Timer a timer class with the methods: 
- */ 
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
-{
-    
-public:
-   
-    typedef void (*messageHandler)(Message*);
-
-    /** Construct the client
-     *  @param network - pointer to an instance of the Network class - must be connected to the endpoint
-     *      before calling MQTT connect
-     *  @param limits an instance of the Limit class - to alter limits as required
-     */
-    Client(Network& network, unsigned int command_timeout_ms = 30000); 
-    
-    /** Set the default message handling callback - used for any message which does not match a subscription message handler
-     *  @param mh - pointer to the callback function
-     */
-    void setDefaultMessageHandler(messageHandler mh)
-    {
-        defaultMessageHandler.attach(mh);
-    }
-    
-    /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
-     *  The nework object must be connected to the network endpoint before calling this 
-     *  @param options - connect options
-     *  @return success code -  
-     */       
-    int connect(MQTTPacket_connectData* options = 0);
-      
-    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
-     *  @param topic - the topic to publish to
-     *  @param message - the message to send
-     *  @return success code -  
-     */      
-    int publish(const char* topicName, Message* message);
-   
-    /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
-     *  @param topicFilter - a topic pattern which can include wildcards
-     *  @param qos - the MQTT QoS to subscribe at
-     *  @param mh - the callback function to be invoked when a message is received for this subscription
-     *  @return success code -  
-     */   
-    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
-    
-    /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
-     *  @param topicFilter - a topic pattern which can include wildcards
-     *  @return success code -  
-     */   
-    int unsubscribe(const char* topicFilter);
-    
-    /** MQTT Disconnect - send an MQTT disconnect packet 
-     *  @return success code -  
-     */
-    int disconnect();
-    
-    /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
-     *  yield can be called if no other MQTT operation is needed.  This will also allow messages to be 
-     *  received.
-     *  @param timeout_ms the time to wait, in milliseconds
-     *  @return success code - on failure, this means the client has disconnected
-     */
-    int yield(int timeout_ms = 1000);
-    
-private:
-
-    int cycle(Timer& timer);
-    int waitfor(int packet_type, Timer& timer);
-    int keepalive();
-
-    int decodePacket(int* value, int timeout);
-    int readPacket(Timer& timer);
-    int sendPacket(int length, Timer& timer);
-    int deliverMessage(MQTTString& topicName, Message& message);
-    bool isTopicMatched(char* topicFilter, MQTTString& topicName);
-    
-    Network& ipstack;
-    unsigned int command_timeout_ms;
-    
-    char buf[MAX_MQTT_PACKET_SIZE];  
-    char readbuf[MAX_MQTT_PACKET_SIZE];  
-
-    Timer ping_timer;
-    unsigned int keepAliveInterval;
-    bool ping_outstanding;
-    
-    PacketId packetid;
-    
-    // typedef FP<void, Message*> messageHandlerFP;
-    struct MessageHandlers
-    {
-        const char* topicFilter;
-        //messageHandlerFP fp; typedefs not liked?
-        FP<void, Message*> fp;
-    } messageHandlers[MAX_MESSAGE_HANDLERS];      // Message handlers are indexed by subscription topic
-    
-    FP<void, Message*> defaultMessageHandler;
-     
-    bool isconnected;
-
-};
-
-}
-
-
-template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
-MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms)  : ipstack(network), packetid()
-{
-    ping_timer = Timer();
-    ping_outstanding = 0;
-    for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
-        messageHandlers[i].topicFilter = 0;
-    this->command_timeout_ms = command_timeout_ms; 
-    isconnected = false;
-}
-
-
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
-{
-    int rc = FAILURE, 
-        sent = 0;
-    
-    while (sent < length && !timer.expired())
-    {
-        rc = ipstack.write(&buf[sent], length, timer.left_ms());
-        if (rc < 0)  // there was an error writing the data
-            break;
-        sent += rc;
-    }
-    if (sent == length)
-    {
-        ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet    
-        rc = SUCCESS;
-    }
-    else
-        rc = FAILURE;
-    return rc;
-}
-
-
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
-{
-    char c;
-    int multiplier = 1;
-    int len = 0;
-    const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
-
-    *value = 0;
-    do
-    {
-        int rc = MQTTPACKET_READ_ERROR;
-
-        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
-        {
-            rc = MQTTPACKET_READ_ERROR; /* bad data */
-            goto exit;
-        }
-        rc = ipstack.read(&c, 1, timeout);
-        if (rc != 1)
-            goto exit;
-        *value += (c & 127) * multiplier;
-        multiplier *= 128;
-    } while ((c & 128) != 0);
-exit:
-    return len;
-}
-
-
-/**
- * If any read fails in this method, then we should disconnect from the network, as on reconnect
- * the packets can be retried. 
- * @param timeout the max time to wait for the packet read to complete, in milliseconds
- * @return the MQTT packet type, or -1 if none
- */
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer) 
-{
-    int rc = FAILURE;
-    MQTTHeader header = {0};
-    int len = 0;
-    int rem_len = 0;
-
-    /* 1. read the header byte.  This has the packet type in it */
-    if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
-        goto exit;
-
-    len = 1;
-    /* 2. read the remaining length.  This is variable in itself */
-    decodePacket(&rem_len, timer.left_ms());
-    len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
-
-    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
-    if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
-        goto exit;
-
-    header.byte = readbuf[0];
-    rc = header.bits.type;
-exit:
-    return rc;
-}
-
-
-// assume topic filter and name is in correct format
-// # can only be at end
-// + and # can only be next to separator
-template<class Network, class Timer, int a, int b> 
-bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
-{
-    char* curf = topicFilter;
-    char* curn = topicName.lenstring.data;
-    char* curn_end = curn + topicName.lenstring.len;
-    
-    while (*curf && curn < curn_end)
-    {
-        if (*curn == '/' && *curf != '/')
-            break;
-        if (*curf != '+' && *curf != '#' && *curf != *curn)
-            break;
-        if (*curf == '+')
-        {   // skip until we meet the next separator, or end of string
-            char* nextpos = curn + 1;
-            while (nextpos < curn_end && *nextpos != '/')
-                nextpos = ++curn + 1;
-        }
-        else if (*curf == '#')
-            curn = curn_end - 1;    // skip until end of string
-        curf++;
-        curn++;
-    };
-    
-    return (curn == curn_end) && (*curf == '\0');
-}
-
-
-
-template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> 
-int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
-{
-    int rc = FAILURE;
-
-    // we have to find the right message handler - indexed by topic
-    for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
-    {
-        if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
-                isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
-        {
-            if (messageHandlers[i].fp.attached())
-            {
-                messageHandlers[i].fp(&message);
-                rc = SUCCESS;
-            }
-        }
-    }
-    
-    if (rc == FAILURE && defaultMessageHandler.attached()) 
-    {
-        defaultMessageHandler(&message);
-        rc = SUCCESS;
-    }   
-    
-    return rc;
-}
-
-
-
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
-{
-    int rc = SUCCESS;
-    Timer timer = Timer();
-    
-    timer.countdown_ms(timeout_ms);
-    while (!timer.expired())
-    {
-        if (cycle(timer) == FAILURE)
-        {
-            rc = FAILURE;
-            break;
-        }
-    }
-        
-    return rc;
-}
-
-
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
-{
-    /* get one piece of work off the wire and one pass through */
-
-    // read the socket, see what work is due
-    int packet_type = readPacket(timer);
-    
-    int len = 0,
-        rc = SUCCESS;
-
-    switch (packet_type)
-    {
-        case CONNACK:
-        case PUBACK:
-        case SUBACK:
-            break;
-        case PUBLISH:
-            MQTTString topicName;
-            Message msg;
-            if (MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
-                                 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
-                goto exit;
-            deliverMessage(topicName, msg);
-            if (msg.qos != QOS0)
-            {
-                if (msg.qos == QOS1)
-                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
-                else if (msg.qos == QOS2)
-                    len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
-                if (len <= 0)
-                    rc = FAILURE;
-                else
-                    rc = sendPacket(len, timer);
-                if (rc == FAILURE)
-                    goto exit; // there was a problem
-            }
-            break;
-        case PUBREC:
-            int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
-                rc = FAILURE;
-            else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
-                rc = FAILURE;
-            else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
-                rc = FAILURE; // there was a problem
-            if (rc == FAILURE)
-                goto exit; // there was a problem
-            break;
-        case PUBCOMP:
-            break;
-        case PINGRESP:
-            ping_outstanding = false;
-            break;
-    }
-    keepalive();
-exit:
-    if (rc == SUCCESS)
-        rc = packet_type;
-    return rc;
-}
-
-
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
-{
-    int rc = FAILURE;
-
-    if (keepAliveInterval == 0)
-    {
-        rc = SUCCESS;
-        goto exit;
-    }
-
-    if (ping_timer.expired())
-    {
-        if (!ping_outstanding)
-        {
-            Timer timer = Timer(1000);
-            int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
-            if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
-                ping_outstanding = true;
-        }
-    }
-
-exit:
-    return rc;
-}
-
-
-// only used in single-threaded mode where one command at a time is in process
-template<class Network, class Timer, int a, int b> 
-int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
-{
-    int rc = FAILURE;
-    
-    do
-    {
-        if (timer.expired()) 
-            break; // we timed out
-    }
-    while ((rc = cycle(timer)) != packet_type);  
-    
-    return rc;
-}
-
-
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
-{
-    Timer connect_timer = Timer(command_timeout_ms);
-    int rc = FAILURE;
-
-    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
-    if (options == 0)
-        options = &default_options; // set default options if none were supplied
-    
-    this->keepAliveInterval = options->keepAliveInterval;
-    ping_timer.countdown(this->keepAliveInterval);
-    int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
-    if (len <= 0)
-        goto exit;
-    if ((rc = sendPacket(len, connect_timer)) != SUCCESS)  // send the connect packet
-        goto exit; // there was a problem
-    
-    // this will be a blocking call, wait for the connack
-    if (waitfor(CONNACK, connect_timer) == CONNACK)
-    {
-        int connack_rc = -1;
-        if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = connack_rc;
-        else
-            rc = FAILURE;
-    }
-    else
-        rc = FAILURE;
-    
-exit:
-    if (rc == SUCCESS)
-        isconnected = true;
-    return rc;
-}
-
-
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
-{ 
-    int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
-    int len = 0;
-    
-    MQTTString topic = {(char*)topicFilter, 0, 0};
-    if (!isconnected)
-        goto exit;
-    
-    len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
-    if (len <= 0)
-        goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
-        goto exit;             // there was a problem
-    
-    if (waitfor(SUBACK, timer) == SUBACK)      // wait for suback 
-    {
-        int count = 0, grantedQoS = -1, mypacketid;
-        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = grantedQoS; // 0, 1, 2 or 0x80 
-        if (rc != 0x80)
-        {
-            for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
-            {
-                if (messageHandlers[i].topicFilter == 0)
-                {
-                    messageHandlers[i].topicFilter = topicFilter;
-                    messageHandlers[i].fp.attach(messageHandler);
-                    rc = 0;
-                    break;
-                }
-            }
-        }
-    }
-    else 
-        rc = FAILURE;
-        
-exit:
-    //if (rc == FAILURE)
-    //   closesession();
-    return rc;
-}
-
-
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
-{   
-    int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
-    
-    MQTTString topic = {(char*)topicFilter, 0, 0};
-    
-    int len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
-    if (len <= 0)
-        goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
-        goto exit; // there was a problem
-    
-    if (waitfor(UNSUBACK, timer) == UNSUBACK)
-    {
-        int mypacketid;  // should be the same as the packetid above
-        if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
-            rc = 0; 
-    }
-    else
-        rc = FAILURE;
-    
-exit:
-    return rc;
-}
-
-
-   
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
-{
-    int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);
-    
-    MQTTString topicString = {(char*)topicName, 0, 0};
-
-    if (message->qos == QOS1 || message->qos == QOS2)
-        message->id = packetid.getNext();
-    
-    int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, 
-              topicString, (char*)message->payload, message->payloadlen);
-    if (len <= 0)
-        goto exit;
-    if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
-        goto exit; // there was a problem
-    
-    if (message->qos == QOS1)
-    {
-        if (waitfor(PUBACK, timer) == PUBACK)
-        {
-            int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
-                rc = FAILURE;
-        }
-        else
-            rc = FAILURE;
-    }
-    else if (message->qos == QOS2)
-    {
-        if (waitfor(PUBCOMP, timer) == PUBCOMP)
-        {
-            int type, dup, mypacketid;
-            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
-                rc = FAILURE;
-        }
-        else
-            rc = FAILURE;
-    }
-    
-exit:
-    return rc;
-}
-
-
-template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> 
-int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
-{  
-    int rc = FAILURE;
-    Timer timer = Timer(command_timeout_ms);     // we might wait for incomplete incoming publishes to complete
-    int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
-    if (len > 0)
-        rc = sendPacket(len, timer);            // send the disconnect packet
-
-    return rc;
-}
-
-
-#endif
\ No newline at end of file
--- a/MQTT/MQTTPacket.lib	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-http://mbed.org/teams/mqtt/code/MQTTPacket/#3893bc7343f4
--- a/UbloxLib.lib	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-http://mbed.org/teams/ublox/code/UbloxLib/#caf0014375cb
--- a/main.cpp	Tue May 20 14:33:32 2014 +0000
+++ b/main.cpp	Tue May 27 09:23:07 2014 +0000
@@ -24,7 +24,6 @@
  
  */
 #include "mbed.h"
-#include "MbedIPStack.h"
 
 //------------------------------------------------------------------------------------
 // You need to configure these cellular modem / SIM parameters.
@@ -44,38 +43,44 @@
 //------------------------------------------------------------------------------------
 
 #include "C12832.h"
+#if defined(TARGET_LPC1768) && !defined(TARGET_UBLOX_C027)
+C12832 lcd(p5, p7, p6, p8, p11);    // The classic TARGET_MBED_LPC1768
+#else
 C12832 lcd(D11, D13, D12, D7, D10);
+#endif
 
-#include "FP.cpp"
+#include "MQTTSocket.h"
 #include "MQTTClient.h"
 
 int arrivedcount = 0;
-
-void messageArrived(MQTT::Message* message)
+ 
+void messageArrived(MQTT::MessageData& md)
 {
+    MQTT::Message &message = md.message;
     lcd.cls();
     lcd.locate(0,3);
-    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id);
-    printf("Payload %.*s\n", message->payloadlen, (char*)message->payload);
+    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
+    printf("Payload %.*s\n", message.payloadlen, (char*)message.payload);
     ++arrivedcount;
-    lcd.puts((char*)message->payload);
+    lcd.puts((char*)message.payload);
 }
-
+ 
+ 
 int main(int argc, char* argv[])
 {   
     MDMSerial mdm;
     //mdm.setDebug(4); // enable this for debugging issues 
     if (!mdm.connect(SIMPIN, APN,USERNAME,PASSWORD))
         return -1;
-            
-    IPStack ipstack = IPStack();
-    float version = 0.43;
+    
+    MQTTSocket ipstack = MQTTSocket();
+    float version = 0.47;
     char* topic = "mbed-sample";
     
     lcd.printf("Version is %f\n", version);
     printf("Version is %f\n", version);
               
-    MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);
+    MQTT::Client<MQTTSocket, Countdown> client = MQTT::Client<MQTTSocket, Countdown>(ipstack);
     
     char* hostname = "m2m.eclipse.org";
     int port = 1883;
@@ -86,18 +91,15 @@
  
     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
     data.MQTTVersion = 3;
-    data.clientID.cstring = "mbed-icraggs";
-    rc = client.connect(&data);
-    if (rc != 0)
+    data.clientID.cstring = "mbed-sample";
+    if ((rc = client.connect(&data)) != 0)
         lcd.printf("rc from MQTT connect is %d\n", rc);
     
-    rc = client.subscribe(topic, MQTT::QOS1, messageArrived);   
-    if (rc != 0) {
-        printf("rc from MQTT subscribe is %d\n", rc);
-    }
-
+    if ((rc = client.subscribe(topic, MQTT::QOS1, messageArrived)) != 0)
+        lcd.printf("rc from MQTT subscribe is %d\n", rc);
+ 
     MQTT::Message message;
-
+ 
     // QoS 0
     char buf[100];
     sprintf(buf, "Hello World!  QoS 0 message from app version %f\n", version);
@@ -126,23 +128,20 @@
     while (arrivedcount == 2)
         client.yield(100);
     
-    rc = client.unsubscribe(topic);
-    if (rc != 0) {
+    if ((rc = client.unsubscribe(topic)) != 0)
         printf("rc from unsubscribe was %d\n", rc);
-    }
     
-    rc = client.disconnect();
-    if (rc != 0) {
+    if ((rc = client.disconnect()) != 0)
         printf("rc from disconnect was %d\n", rc);
-    }
-
+    
+    ipstack.disconnect();
     mdm.disconnect();
     mdm.powerOff();
     
     lcd.cls();
     lcd.locate(0,3);
-    lcd.printf("Finish: %d msgs\n", arrivedcount);
+    lcd.printf("Version %.2f: finish %d msgs\n", version, arrivedcount);
     printf("Finishing with %d messages received\n", arrivedcount);
     
-    while(true);
-}
+    return 0;
+}
\ No newline at end of file
--- a/mbed-rtos.lib	Tue May 20 14:33:32 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-http://mbed.org/users/mbed_official/code/mbed-rtos/#5dfe422a963d