My goal with this project was to create a quick operations console to interface with embedded hardware for messaging and switching. As I am part of an active project called I figured it would be a perfect tool to provide the user interface for the console. Kivy is a cross platform development framework based on Python and the C language. For further reading I urge you to click over to the website and download a copy of the framework for your operating system and run some examples. The API is ultra easy and very feature rich including 2D & 3D support with standard widget libraries and Advanced multitouch gesturing support that all runs in the hardware GPU on Microsoft windows operating systems, Apple Mac OSx operating systems including IOS for the iPhone and iPad as well as support for Android and Linux. The example code I am providing in this blog entry can easily be built simultaneously for each of the above operating systems. See

Dependencies:   EthernetNetIf TextLCD mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File


00001 #include "MQTTClient.h"
00003 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*))
00004 {
00005     this->port = port;
00006     callback_server = callback;
00007     serverIp = server;
00008     connected = false;
00009     sessionOpened = false;
00010     timer.start();
00011 }
00013 MQTTClient::~MQTTClient()
00014 {
00015 }
00017 int MQTTClient::send_data(const char* msg, int size)
00018 {
00019     int transLen = pTCPSocket->send(msg, size);
00021     /*Check send length matches the message length*/
00022     if(transLen != size){
00023         for(int i = 0; i < size; i++) {
00024             printf("%x, ", msg[i]);
00025         }
00026         printf("Error on send.\r\n");
00027         return -1;
00028     }
00029     return 1;
00030 }
00032 int MQTTClient::open_session(char* id)
00033 {
00034     /*variable header*/
00035     char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
00037     /*fixed header: 2 bytes, big endian*/
00038     char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
00040     char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)];
00042     memset(packet,0,sizeof(packet));
00043     memcpy(packet,fixed_header,sizeof(fixed_header));
00044     memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
00045     memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
00047     /*
00048      for(int i = 0; i < sizeof(packet); i++) {
00049      printf("%x, ", packet[i]);
00050      }
00051      printf("\r\n");*/
00053     if(!send_data(packet, sizeof(packet)))
00054     {
00055         return -1;
00056     }
00057     return 1;
00058 }
00060 int MQTTClient::connect(char* id)
00061 {
00062     clientId = id;
00064     /*Initial TCP socket*/
00065     pTCPSocket = new TCPSocket;
00066     pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
00068     host.setPort(1883);
00069     host.setIp(serverIp);
00070     host.setName("localhost");
00072     /*Trying to connect to host*/
00073     printf("Trying to connect to host..\r\n\r\n");
00074     TCPSocketErr err = pTCPSocket->connect(host);
00076     Net::poll();   
00077     if(err)
00078     {
00079         printf("Error connecting to host [%d]\r\n", (int) err);
00080         return -1;
00081     } 
00082     printf("Connect to host sucessed..\r\n\r\n");    
00084     /*Wait TCP connection with server to be established*/
00085     int i = 0;
00086     while(!connected)
00087     {
00088         Net::poll();
00089         wait(1);
00090         i++;
00091         printf("Wait for connections %d..\r\n", i);
00092         if(i == 35)
00093         {//If wait too long, give up.
00094             return -1;
00095         }
00096     }
00098     /*Send open session message to server*/
00099     open_session(id);
00101     /*Wait server notice of open sesion*/
00102     while(!sessionOpened)
00103     {
00104         Net::poll();
00105         wait(1);
00106         if(!connected){
00107             break;
00108         }
00109         printf("Wait for session..\r\n");
00110     }
00111     if(!connected)
00112     {
00113         return -2;
00114     }
00115     lastActivity = timer.read_ms();
00116     return 1;
00117 }
00119 int MQTTClient::publish(char* pub_topic, char* msg) 
00120 {
00121     uint8_t var_header_pub[strlen(pub_topic)+3];
00122     strcpy((char *)&var_header_pub[2], pub_topic);
00123     var_header_pub[0] = 0;
00124     var_header_pub[1] = strlen(pub_topic);
00125     var_header_pub[sizeof(var_header_pub)-1] = 0;
00127     uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
00129     uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
00130     memset(packet_pub,0,sizeof(packet_pub));
00131     memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
00132     memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
00133     memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
00135     if(!send_data((char*)packet_pub, sizeof(packet_pub)))
00136     {
00137         return -1;
00138     }
00139     return 1;
00140 }
00143 void MQTTClient::disconnect() 
00144 {
00145     char packet_224[] = {0xe0, 0x00};
00146     send_data((char*)packet_224, 2);
00148     connected = false;
00149 }
00151 void MQTTClient::read_data()
00152 {
00153     char buffer[1024];
00154     int len = 0, readLen;
00156     while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){
00157         len += readLen;
00158     }
00160     buffer[len] = '\0';
00162     /*
00163      printf("Read length: %d %d\r\n", len, readLen);
00165      for(int i = 0; i < len; i++) {
00166      printf("%x\r\n", buffer[i],buffer[i]);
00167      }
00168      printf("\r\n");
00169      */
00170     char type = buffer[0]>>4;
00171     if (type == 3) { // PUBLISH
00172         if (callback_server) {
00173             uint8_t tl = (buffer[2]<<3)+buffer[3];
00174             char topic[tl+1];
00175             for (int i=0;i<tl;i++) {
00176                 topic[i] = buffer[4+i];
00177             }
00178             topic[tl] = 0;
00179             // ignore msgID - only support QoS 0 subs
00180             char *payload = buffer+4+tl;
00181             callback_server(topic,(char*)payload);
00182         }
00183     } else if (type == 12) { // PINGREG -- Ask for alive
00184         char packet_208[] = {0xd0, 0x00};
00185         send_data((char*)packet_208, 2);
00186         lastActivity = timer.read_ms();
00187     }
00188 }
00190 void MQTTClient::read_open_session()
00191 {
00192     char buffer[32];
00193     int len = 0, readLen;
00195     while((readLen = pTCPSocket->recv(buffer, 32)) != 0)
00196     {
00197         len += readLen;
00198     }
00200     if(len == 4 && buffer[3] == 0)
00201     {
00202         printf("Session opened\r\n");
00203         sessionOpened = true;
00204     }
00205 }
00207 int MQTTClient::subscribe(char* topic)
00208 {
00210     if (connected) 
00211     {
00212         uint8_t var_header_topic[] = {0,10};    
00213         uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
00215         //utf topic
00216         uint8_t utf_topic[strlen(topic)+3];
00217         strcpy((char *)&utf_topic[2], topic);
00219         utf_topic[0] = 0;
00220         utf_topic[1] = strlen(topic);
00221         utf_topic[sizeof(utf_topic)-1] = 0;
00223         char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
00224         memset(packet_topic,0,sizeof(packet_topic));
00225         memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
00226         memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
00227         memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
00229         if (!send_data(packet_topic, sizeof(packet_topic)))
00230         {
00231             return -1;
00232         }
00233         return 1;
00234     }
00235     return -1;
00236 }
00238 void MQTTClient::live() 
00239 {
00240     if (connected) 
00241     {
00242         int t = timer.read_ms();
00243         if (t - lastActivity > KEEPALIVE) {
00244             //Send 192 0 to broker
00245             printf("Send 192\r\n");
00246             char packet_192[] = {0xc0, 0x00};
00247             send_data((char*)packet_192, 2);
00248             lastActivity = t;
00249         }
00250     }
00251 }
00253 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e)
00254 {
00255     switch(e)
00256     {
00257         case TCPSOCKET_ACCEPT:
00258             printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
00259             break;
00260         case TCPSOCKET_CONNECTED: 
00261             printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
00262             connected = true;
00263             break;
00264         case TCPSOCKET_WRITEABLE: 
00265             printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");  
00266             break;
00267         case TCPSOCKET_READABLE:
00268             printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
00269             if(!sessionOpened)
00270             {
00271                 read_open_session();
00272             }
00273             read_data();
00274             break;
00275         case TCPSOCKET_CONTIMEOUT:
00276             printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
00277             break;
00278         case TCPSOCKET_CONRST:
00279             printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
00280             break;
00281         case TCPSOCKET_CONABRT:
00282             printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
00283             break;
00284         case TCPSOCKET_ERROR:
00285             printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
00286             break;
00287         case TCPSOCKET_DISCONNECTED:
00288             printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
00289             pTCPSocket->close();
00290             connected = false;
00291             break;
00292     }
00293 }