My goal with this project was to create a quick operations console to interface with embedded hardware for messaging and switching. As I am part of an active project called Kivy.org I figured it would be a perfect tool to provide the user interface for the console. Kivy is a cross platform development framework based on Python and the C language. For further reading I urge you to click over to the http://kivy.org website and download a copy of the framework for your operating system and run some examples. The API is ultra easy and very feature rich including 2D & 3D support with standard widget libraries and Advanced multitouch gesturing support that all runs in the hardware GPU on Microsoft windows operating systems, Apple Mac OSx operating systems including IOS for the iPhone and iPad as well as support for Android and Linux. The example code I am providing in this blog entry can easily be built simultaneously for each of the above operating systems. See http://mctouch.wordpress.com

Dependencies:   EthernetNetIf TextLCD mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File

MQTTClient.cpp

00001 #include "MQTTClient.h"
00002 
00003 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*))
00004 {
00005     this->port = port;
00006     callback_server = callback;
00007     serverIp = server;
00008     connected = false;
00009     sessionOpened = false;
00010     timer.start();
00011 }
00012 
00013 MQTTClient::~MQTTClient()
00014 {
00015 }
00016 
00017 int MQTTClient::send_data(const char* msg, int size)
00018 {
00019     int transLen = pTCPSocket->send(msg, size);
00020     
00021     /*Check send length matches the message length*/
00022     if(transLen != size){
00023         for(int i = 0; i < size; i++) {
00024             printf("%x, ", msg[i]);
00025         }
00026         printf("Error on send.\r\n");
00027         return -1;
00028     }
00029     return 1;
00030 }
00031 
00032 int MQTTClient::open_session(char* id)
00033 {
00034     /*variable header*/
00035     char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
00036     
00037     /*fixed header: 2 bytes, big endian*/
00038     char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
00039     
00040     char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)];
00041     
00042     memset(packet,0,sizeof(packet));
00043     memcpy(packet,fixed_header,sizeof(fixed_header));
00044     memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
00045     memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
00046     
00047     /*
00048      for(int i = 0; i < sizeof(packet); i++) {
00049      printf("%x, ", packet[i]);
00050      }
00051      printf("\r\n");*/
00052     
00053     if(!send_data(packet, sizeof(packet)))
00054     {
00055         return -1;
00056     }
00057     return 1;
00058 }
00059 
00060 int MQTTClient::connect(char* id)
00061 {
00062     clientId = id;
00063     
00064     /*Initial TCP socket*/
00065     pTCPSocket = new TCPSocket;
00066     pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
00067     
00068     host.setPort(1883);
00069     host.setIp(serverIp);
00070     host.setName("localhost");
00071     
00072     /*Trying to connect to host*/
00073     printf("Trying to connect to host..\r\n\r\n");
00074     TCPSocketErr err = pTCPSocket->connect(host);
00075     
00076     Net::poll();   
00077     if(err)
00078     {
00079         printf("Error connecting to host [%d]\r\n", (int) err);
00080         return -1;
00081     } 
00082     printf("Connect to host sucessed..\r\n\r\n");    
00083     
00084     /*Wait TCP connection with server to be established*/
00085     int i = 0;
00086     while(!connected)
00087     {
00088         Net::poll();
00089         wait(1);
00090         i++;
00091         printf("Wait for connections %d..\r\n", i);
00092         if(i == 35)
00093         {//If wait too long, give up.
00094             return -1;
00095         }
00096     }
00097     
00098     /*Send open session message to server*/
00099     open_session(id);
00100     
00101     /*Wait server notice of open sesion*/
00102     while(!sessionOpened)
00103     {
00104         Net::poll();
00105         wait(1);
00106         if(!connected){
00107             break;
00108         }
00109         printf("Wait for session..\r\n");
00110     }
00111     if(!connected)
00112     {
00113         return -2;
00114     }
00115     lastActivity = timer.read_ms();
00116     return 1;
00117 }
00118 
00119 int MQTTClient::publish(char* pub_topic, char* msg) 
00120 {
00121     uint8_t var_header_pub[strlen(pub_topic)+3];
00122     strcpy((char *)&var_header_pub[2], pub_topic);
00123     var_header_pub[0] = 0;
00124     var_header_pub[1] = strlen(pub_topic);
00125     var_header_pub[sizeof(var_header_pub)-1] = 0;
00126     
00127     uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
00128     
00129     uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
00130     memset(packet_pub,0,sizeof(packet_pub));
00131     memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
00132     memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
00133     memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
00134     
00135     if(!send_data((char*)packet_pub, sizeof(packet_pub)))
00136     {
00137         return -1;
00138     }
00139     return 1;
00140 }
00141 
00142 
00143 void MQTTClient::disconnect() 
00144 {
00145     char packet_224[] = {0xe0, 0x00};
00146     send_data((char*)packet_224, 2);
00147     
00148     connected = false;
00149 }
00150 
00151 void MQTTClient::read_data()
00152 {
00153     char buffer[1024];
00154     int len = 0, readLen;
00155     
00156     while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){
00157         len += readLen;
00158     }
00159     
00160     buffer[len] = '\0';
00161     
00162     /*
00163      printf("Read length: %d %d\r\n", len, readLen);
00164      
00165      for(int i = 0; i < len; i++) {
00166      printf("%x\r\n", buffer[i],buffer[i]);
00167      }
00168      printf("\r\n");
00169      */
00170     char type = buffer[0]>>4;
00171     if (type == 3) { // PUBLISH
00172         if (callback_server) {
00173             uint8_t tl = (buffer[2]<<3)+buffer[3];
00174             char topic[tl+1];
00175             for (int i=0;i<tl;i++) {
00176                 topic[i] = buffer[4+i];
00177             }
00178             topic[tl] = 0;
00179             // ignore msgID - only support QoS 0 subs
00180             char *payload = buffer+4+tl;
00181             callback_server(topic,(char*)payload);
00182         }
00183     } else if (type == 12) { // PINGREG -- Ask for alive
00184         char packet_208[] = {0xd0, 0x00};
00185         send_data((char*)packet_208, 2);
00186         lastActivity = timer.read_ms();
00187     }
00188 }
00189 
00190 void MQTTClient::read_open_session()
00191 {
00192     char buffer[32];
00193     int len = 0, readLen;
00194     
00195     while((readLen = pTCPSocket->recv(buffer, 32)) != 0)
00196     {
00197         len += readLen;
00198     }
00199     
00200     if(len == 4 && buffer[3] == 0)
00201     {
00202         printf("Session opened\r\n");
00203         sessionOpened = true;
00204     }
00205 }
00206 
00207 int MQTTClient::subscribe(char* topic)
00208 {
00209     
00210     if (connected) 
00211     {
00212         uint8_t var_header_topic[] = {0,10};    
00213         uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
00214         
00215         //utf topic
00216         uint8_t utf_topic[strlen(topic)+3];
00217         strcpy((char *)&utf_topic[2], topic);
00218         
00219         utf_topic[0] = 0;
00220         utf_topic[1] = strlen(topic);
00221         utf_topic[sizeof(utf_topic)-1] = 0;
00222         
00223         char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
00224         memset(packet_topic,0,sizeof(packet_topic));
00225         memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
00226         memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
00227         memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
00228         
00229         if (!send_data(packet_topic, sizeof(packet_topic)))
00230         {
00231             return -1;
00232         }
00233         return 1;
00234     }
00235     return -1;
00236 }
00237 
00238 void MQTTClient::live() 
00239 {
00240     if (connected) 
00241     {
00242         int t = timer.read_ms();
00243         if (t - lastActivity > KEEPALIVE) {
00244             //Send 192 0 to broker
00245             printf("Send 192\r\n");
00246             char packet_192[] = {0xc0, 0x00};
00247             send_data((char*)packet_192, 2);
00248             lastActivity = t;
00249         }
00250     }
00251 }
00252 
00253 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e)
00254 {
00255     switch(e)
00256     {
00257         case TCPSOCKET_ACCEPT:
00258             printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
00259             break;
00260         case TCPSOCKET_CONNECTED: 
00261             printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
00262             connected = true;
00263             break;
00264         case TCPSOCKET_WRITEABLE: 
00265             printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");  
00266             break;
00267         case TCPSOCKET_READABLE:
00268             printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
00269             if(!sessionOpened)
00270             {
00271                 read_open_session();
00272             }
00273             read_data();
00274             break;
00275         case TCPSOCKET_CONTIMEOUT:
00276             printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
00277             break;
00278         case TCPSOCKET_CONRST:
00279             printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
00280             break;
00281         case TCPSOCKET_CONABRT:
00282             printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
00283             break;
00284         case TCPSOCKET_ERROR:
00285             printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
00286             break;
00287         case TCPSOCKET_DISCONNECTED:
00288             printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
00289             pTCPSocket->close();
00290             connected = false;
00291             break;
00292     }
00293 }