My goal with this project was to create a quick operations console to interface with embedded hardware for messaging and switching. As I am part of an active project called Kivy.org I figured it would be a perfect tool to provide the user interface for the console. Kivy is a cross platform development framework based on Python and the C language. For further reading I urge you to click over to the http://kivy.org website and download a copy of the framework for your operating system and run some examples. The API is ultra easy and very feature rich including 2D & 3D support with standard widget libraries and Advanced multitouch gesturing support that all runs in the hardware GPU on Microsoft windows operating systems, Apple Mac OSx operating systems including IOS for the iPhone and iPad as well as support for Android and Linux. The example code I am providing in this blog entry can easily be built simultaneously for each of the above operating systems. See http://mctouch.wordpress.com

Dependencies:   EthernetNetIf TextLCD mbed

Files at this revision

API Documentation at this revision

Comitter:
mctouch
Date:
Thu Jun 28 21:29:07 2012 +0000
Commit message:
For blog entry

Changed in this revision

EthernetNetIf.lib Show annotated file Show diff for this revision Revisions of this file
MQTTClient.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTClient.h Show annotated file Show diff for this revision Revisions of this file
TextLCD.lib Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
mbed.bld Show annotated file Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/EthernetNetIf.lib	Thu Jun 28 21:29:07 2012 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/sherckuith/code/EthernetNetIf/#479ce5546098
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTClient.cpp	Thu Jun 28 21:29:07 2012 +0000
@@ -0,0 +1,293 @@
+#include "MQTTClient.h"
+
+MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*))
+{
+    this->port = port;
+    callback_server = callback;
+    serverIp = server;
+    connected = false;
+    sessionOpened = false;
+    timer.start();
+}
+
+MQTTClient::~MQTTClient()
+{
+}
+
+int MQTTClient::send_data(const char* msg, int size)
+{
+    int transLen = pTCPSocket->send(msg, size);
+    
+    /*Check send length matches the message length*/
+    if(transLen != size){
+        for(int i = 0; i < size; i++) {
+            printf("%x, ", msg[i]);
+        }
+        printf("Error on send.\r\n");
+        return -1;
+    }
+    return 1;
+}
+
+int MQTTClient::open_session(char* id)
+{
+    /*variable header*/
+    char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
+    
+    /*fixed header: 2 bytes, big endian*/
+    char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
+	
+    char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)];
+    
+    memset(packet,0,sizeof(packet));
+    memcpy(packet,fixed_header,sizeof(fixed_header));
+    memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
+    memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
+    
+    /*
+	 for(int i = 0; i < sizeof(packet); i++) {
+	 printf("%x, ", packet[i]);
+	 }
+	 printf("\r\n");*/
+    
+    if(!send_data(packet, sizeof(packet)))
+    {
+        return -1;
+    }
+    return 1;
+}
+
+int MQTTClient::connect(char* id)
+{
+    clientId = id;
+    
+    /*Initial TCP socket*/
+    pTCPSocket = new TCPSocket;
+    pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
+    
+    host.setPort(1883);
+    host.setIp(serverIp);
+    host.setName("localhost");
+    
+    /*Trying to connect to host*/
+    printf("Trying to connect to host..\r\n\r\n");
+    TCPSocketErr err = pTCPSocket->connect(host);
+	
+    Net::poll();   
+    if(err)
+    {
+        printf("Error connecting to host [%d]\r\n", (int) err);
+        return -1;
+    } 
+    printf("Connect to host sucessed..\r\n\r\n");    
+	
+    /*Wait TCP connection with server to be established*/
+    int i = 0;
+    while(!connected)
+    {
+		Net::poll();
+		wait(1);
+		i++;
+		printf("Wait for connections %d..\r\n", i);
+		if(i == 35)
+		{//If wait too long, give up.
+            return -1;
+		}
+    }
+    
+    /*Send open session message to server*/
+    open_session(id);
+    
+    /*Wait server notice of open sesion*/
+    while(!sessionOpened)
+    {
+		Net::poll();
+		wait(1);
+		if(!connected){
+            break;
+		}
+		printf("Wait for session..\r\n");
+    }
+    if(!connected)
+    {
+        return -2;
+    }
+    lastActivity = timer.read_ms();
+    return 1;
+}
+
+int MQTTClient::publish(char* pub_topic, char* msg) 
+{
+    uint8_t var_header_pub[strlen(pub_topic)+3];
+    strcpy((char *)&var_header_pub[2], pub_topic);
+    var_header_pub[0] = 0;
+    var_header_pub[1] = strlen(pub_topic);
+    var_header_pub[sizeof(var_header_pub)-1] = 0;
+    
+    uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
+    
+    uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
+    memset(packet_pub,0,sizeof(packet_pub));
+    memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
+    memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
+    memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
+    
+    if(!send_data((char*)packet_pub, sizeof(packet_pub)))
+    {
+        return -1;
+    }
+    return 1;
+}
+
+
+void MQTTClient::disconnect() 
+{
+    char packet_224[] = {0xe0, 0x00};
+    send_data((char*)packet_224, 2);
+    
+    connected = false;
+}
+
+void MQTTClient::read_data()
+{
+    char buffer[1024];
+    int len = 0, readLen;
+	
+    while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){
+        len += readLen;
+    }
+    
+    buffer[len] = '\0';
+    
+    /*
+	 printf("Read length: %d %d\r\n", len, readLen);
+	 
+	 for(int i = 0; i < len; i++) {
+	 printf("%x\r\n", buffer[i],buffer[i]);
+	 }
+	 printf("\r\n");
+	 */
+    char type = buffer[0]>>4;
+    if (type == 3) { // PUBLISH
+		if (callback_server) {
+			uint8_t tl = (buffer[2]<<3)+buffer[3];
+			char topic[tl+1];
+			for (int i=0;i<tl;i++) {
+				topic[i] = buffer[4+i];
+			}
+			topic[tl] = 0;
+			// ignore msgID - only support QoS 0 subs
+			char *payload = buffer+4+tl;
+			callback_server(topic,(char*)payload);
+		}
+    } else if (type == 12) { // PINGREG -- Ask for alive
+        char packet_208[] = {0xd0, 0x00};
+        send_data((char*)packet_208, 2);
+        lastActivity = timer.read_ms();
+    }
+}
+
+void MQTTClient::read_open_session()
+{
+    char buffer[32];
+    int len = 0, readLen;
+	
+    while((readLen = pTCPSocket->recv(buffer, 32)) != 0)
+    {
+        len += readLen;
+    }
+	
+    if(len == 4 && buffer[3] == 0)
+    {
+        printf("Session opened\r\n");
+        sessionOpened = true;
+    }
+}
+
+int MQTTClient::subscribe(char* topic)
+{
+	
+    if (connected) 
+    {
+        uint8_t var_header_topic[] = {0,10};    
+        uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
+        
+        //utf topic
+        uint8_t utf_topic[strlen(topic)+3];
+        strcpy((char *)&utf_topic[2], topic);
+		
+        utf_topic[0] = 0;
+        utf_topic[1] = strlen(topic);
+        utf_topic[sizeof(utf_topic)-1] = 0;
+        
+        char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
+        memset(packet_topic,0,sizeof(packet_topic));
+        memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
+        memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
+        memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
+        
+        if (!send_data(packet_topic, sizeof(packet_topic)))
+        {
+            return -1;
+        }
+        return 1;
+    }
+    return -1;
+}
+
+void MQTTClient::live() 
+{
+    if (connected) 
+    {
+        int t = timer.read_ms();
+        if (t - lastActivity > KEEPALIVE) {
+            //Send 192 0 to broker
+            printf("Send 192\r\n");
+            char packet_192[] = {0xc0, 0x00};
+            send_data((char*)packet_192, 2);
+            lastActivity = t;
+        }
+    }
+}
+
+void MQTTClient::onTCPSocketEvent(TCPSocketEvent e)
+{
+    switch(e)
+    {
+        case TCPSOCKET_ACCEPT:
+            printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
+            break;
+        case TCPSOCKET_CONNECTED: 
+            printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
+            connected = true;
+            break;
+        case TCPSOCKET_WRITEABLE: 
+            printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");  
+            break;
+        case TCPSOCKET_READABLE:
+            printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
+            if(!sessionOpened)
+            {
+                read_open_session();
+            }
+            read_data();
+            break;
+        case TCPSOCKET_CONTIMEOUT:
+            printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
+            break;
+        case TCPSOCKET_CONRST:
+            printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
+            break;
+        case TCPSOCKET_CONABRT:
+            printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
+            break;
+        case TCPSOCKET_ERROR:
+            printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
+            break;
+        case TCPSOCKET_DISCONNECTED:
+            printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
+            pTCPSocket->close();
+            connected = false;
+            break;
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTClient.h	Thu Jun 28 21:29:07 2012 +0000
@@ -0,0 +1,51 @@
+/**
+ *A simple MQTT client for mbed, version 2.0
+ *By Yilun FAN, @CEIT, @JAN 2011
+ *
+ */
+#ifndef MQTT_CLIENT_H
+#define MQTT_CLIENT_H
+
+#include "mbed.h"
+#include "TCPSocket.h"
+
+#define MQTTCONNECT 1<<4
+#define MQTTPUBLISH 3<<4
+#define MQTTSUBSCRIBE 8<<4
+
+#define MAX_PACKET_SIZE 128
+#define KEEPALIVE 15000
+
+class MQTTClient 
+{
+public:
+    MQTTClient(IpAddr server, int port, void (*callback)(char*, char*));
+    ~MQTTClient();
+    int connect(char *);
+    void disconnect();
+    int publish(char *, char *);
+    int subscribe(char *);
+    void live();
+	
+private:
+    int open_session(char* id);
+    void read_open_session();
+    int send_data(const char* msg, int size);
+    void read_data();
+    
+    char* clientId;
+    Timer timer;
+    IpAddr serverIp;
+    int port;
+    bool connected;
+    bool sessionOpened;
+    
+    void onTCPSocketEvent(TCPSocketEvent e);
+    TCPSocket* pTCPSocket;
+    Host host;
+    
+    int lastActivity;
+    void (*callback_server)(char*, char*);
+};
+
+#endif
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/TextLCD.lib	Thu Jun 28 21:29:07 2012 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/simon/code/TextLCD/#44f34c09bd37
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main.cpp	Thu Jun 28 21:29:07 2012 +0000
@@ -0,0 +1,100 @@
+#include "mbed.h"
+#include "EthernetNetIf.h"
+#include "MQTTClient.h"
+#include "TextLCD.h"
+
+EthernetNetIf ethernet;
+
+Serial pc(USBTX, USBRX);
+
+DigitalOut myled1(LED1);
+DigitalOut myled2(LED2);
+DigitalOut myled3(LED3);
+DigitalOut myled4(LED4);
+
+TextLCD lcd(p15, p16, p17, p18, p19, p20, TextLCD::LCD16x2B); // rs, e, d4-d7
+
+IpAddr serverIpAddr(192,168,0,2);  /*Sever ip address*/
+
+void callback(char* topic, char* payload); /*Callback function prototype*/
+
+MQTTClient mqtt(serverIpAddr, 1883, callback);
+
+void callback(char* topic, char* payload) {
+    printf("Topic: %s\r\n", topic);
+    printf("Payload: %s\r\n\r\n", payload);
+    //Send incoming payloads back to topic "/mbed".
+    printf("Topic:  ");
+    printf(topic, "\r\n");
+    printf("Payload:  ");
+    printf(payload, "\r\n");
+
+    char *string_1 = "LED1";
+    char *string_2 = "LED2";
+    char *string_3 = "LED3";
+    char *string_4 = "LED4";
+
+    if (strcmp(payload, string_1)==0) {
+        printf(payload);
+        printf("Setting LED1");
+        myled1 = 1;
+    } else if (strcmp(payload, string_2)==0) {
+        printf(payload);
+        printf("'Setting LED2");
+        myled2 = 1;
+    } else if (strcmp(payload, string_3)==0) {
+        printf(payload);
+        printf("Setting LED3");
+        myled3 = 1;
+    } else if (strcmp(payload, string_4)==0) {
+        printf(payload);
+        printf("Setting LED4");
+        myled4 = 1;
+    } else {
+        lcd.printf(payload);
+    }
+
+
+    //pc.printf(payload);
+    //while(1) {pc.putc(pc.getc()
+
+    mqtt.publish("/mbed", payload);
+
+
+}
+
+int main() {
+    printf("\r\n############### MQTTClient Session  ##&#65283;&#65283;#######\r\n\r\n");
+
+    EthernetErr ethErr = ethernet.setup();
+    if (ethErr) {
+        printf("Ethernet Error %d\r\n", ethErr);
+        return -1;
+    }
+
+    char clientID[] = "mbed";   /*Client nanme show for MQTT server*/
+    char pub_topic[] = "/mbed";   /*Publish to topic : "/mbed" */
+    char sub_topic[] = "/kivy";   /*Subscribe to topic : "/mirror" */
+
+    if (!mqtt.connect(clientID)) {
+        printf("\r\nConnect to server failed ..\r\n");
+        return -1;
+    }
+    printf("\r\nConnect to server sucessed ..\r\n");
+
+    mqtt.publish(pub_topic, "Hello here is mbed...");
+    mqtt.subscribe(sub_topic);
+
+    int flag = 0;
+    /*Keep alive for 300s or 5mins*/
+    while (flag < 300) {
+        Net::poll();
+        wait(1);
+        flag++;
+        mqtt.live();
+    }
+
+    mqtt.disconnect();
+
+    printf("#### End of the session.. ####\r\n\r\n");
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mbed.bld	Thu Jun 28 21:29:07 2012 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/mbed_official/code/mbed/builds/737756e0b479