An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time

Files at this revision

API Documentation at this revision

Comitter:
Ian Craggs
Date:
Wed Apr 09 11:37:28 2014 +0100
Parent:
6:4d312a49200b
Child:
8:c46930bd6c82
Commit message:
Fixed some bugs

Changed in this revision

MQTTClient.cpp Show annotated file Show diff for this revision Revisions of this file
linux_main.cpp Show annotated file Show diff for this revision Revisions of this file
--- a/MQTTClient.cpp	Tue Apr 08 22:54:37 2014 +0000
+++ b/MQTTClient.cpp	Wed Apr 09 11:37:28 2014 +0100
@@ -21,8 +21,8 @@
 {
     
    buf = new char[buffer_size];
-   readbuf = new char[buffer_size];
-   this->ipstack = ipstack;
+   readbuf = new char[buffer_size];
+   buflen = readbuflen = buffer_size;
    this->command_timeout = command_timeout;
    //this->thread = new Thread(0); // only need a background thread for non-blocking mode
    this->ipstack = network;
@@ -153,7 +153,8 @@
 	}
 	
 	this->keepalive = options->keepAliveInterval;
-	len = MQTTSerialize_connect(buf, buflen, options);
+	len = MQTTSerialize_connect(buf, buflen, options);
+	printf("len from send is %d %d\n", len, buflen);
     rc = sendPacket(len); // send the connect packet
 	printf("rc from send is %d\n", rc);
     
@@ -205,4 +206,4 @@
     }
 	
 	return rc;
-}
\ No newline at end of file
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/linux_main.cpp	Wed Apr 09 11:37:28 2014 +0100
@@ -0,0 +1,232 @@
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/param.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+
+#include <pthread.h>
+
+#include "MQTTClient.h"
+#include "MQTTClient.cpp"
+#include "FP.cpp"
+
+#define DEFAULT_STACK_SIZE -1
+
+class Thread {
+public:
+
+    Thread(void* (*fn)(void *parameter), void *parameter=NULL,
+           int priority=0,
+           uint32_t stack_size=DEFAULT_STACK_SIZE,
+           unsigned char *stack_pointer=NULL)
+	{
+		thread = 0;
+		pthread_attr_t attr;
+
+		pthread_attr_init(&attr);
+		pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+		if (pthread_create(&thread, &attr, fn, parameter) != 0)
+			thread = 0;
+		pthread_attr_destroy(&attr);
+	}
+    
+private:
+	pthread_t thread;
+};
+
+
+class IPStack 
+{
+public:    
+    IPStack()
+    {
+
+    }
+    
+	int Socket_error(const char* aString)
+	{
+
+		if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
+		{
+			if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
+				printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
+		}
+		return errno;
+	}
+
+    int connect(const char* hostname, int port)
+    {
+		int type = SOCK_STREAM;
+		struct sockaddr_in address;
+		int rc = -1;
+		sa_family_t family = AF_INET;
+		struct addrinfo *result = NULL;
+		struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
+
+		if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
+		{
+			struct addrinfo* res = result;
+
+			/* prefer ip4 addresses */
+			while (res)
+			{
+				if (res->ai_family == AF_INET)
+				{
+					result = res;
+					break;
+				}
+				res = res->ai_next;
+			}
+
+			if (result->ai_family == AF_INET)
+			{
+				address.sin_port = htons(port);
+				address.sin_family = family = AF_INET;
+				address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
+			}
+			else
+				rc = -1;
+
+			freeaddrinfo(result);
+		}
+
+		if (rc == 0)
+		{
+			mysock = socket(family, type, 0);
+			if (mysock != -1)
+			{
+				int opt = 1;
+
+				//if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
+				//	printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
+				
+				rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
+			}
+		}
+
+        return rc;
+    }
+
+    int read(char* buffer, int len, int timeout)
+    {
+		printf("reading %d bytes\n", len);
+		int rc = ::recv(mysock, buffer, (size_t)len, 0);
+		if (rc == -1)
+			Socket_error("read");
+		printf("read %d bytes\n", rc);
+		return rc;
+    }
+    
+    int write(char* buffer, int len, int timeout)
+    {
+		return ::write(mysock, buffer, len);
+    }
+    
+private:
+
+    int mysock; 
+    
+};
+
+
+class Timer
+{
+public:
+
+	Timer()
+	{
+		reset();
+	}
+
+	void start()
+	{
+		if (running)
+			return;
+		if (!timerisset(&stop_time))
+			gettimeofday(&start_time, NULL);
+		else 
+		{
+			struct timeval now, res;
+
+			gettimeofday(&now, NULL);
+
+			timersub(&now, &stop_time, &res); // interval to be added to start time
+			timeradd(&start_time, &res, &stop_time);
+			stop_time = start_time;
+			timerclear(&stop_time);
+		}
+		running = true;
+	}
+
+	void stop()
+	{
+		if (running)
+		{
+			gettimeofday(&stop_time, NULL);
+			running = false;
+		}
+	}
+
+	void reset()
+	{
+		timerclear(&start_time);
+		timerclear(&stop_time);
+		running = false;
+	}
+
+	int read_ms() // get the time passed in milli-seconds
+	{
+		struct timeval now, res;
+
+		gettimeofday(&now, NULL);
+		timersub(&now, &start_time, &res);
+		return (res.tv_sec)*1000 + (res.tv_usec)/1000;
+	}
+
+private:
+
+	struct timeval start_time, stop_time;
+	bool running;
+
+};
+
+
+void messageArrived(MQTT::Message* message)
+{
+}
+
+
+int main(int argc, char* argv[])
+{   
+    IPStack ipstack = IPStack();
+	Timer t;
+	FP<void, MQTT::Message*> messageArrivedPointer;
+
+	messageArrivedPointer.attach(messageArrived);
+      
+    int rc = ipstack.connect("127.0.0.1", 1883);
+	printf("rc from TCP connect is %d\n", rc);
+        
+    MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t);
+
+	printf("constructed\n");
+    
+    rc = client.connect();
+	printf("rc from connect is %d\n", rc);
+
+	rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer);
+	sleep(1);
+}