My goal with this project was to create a quick operations console to interface with embedded hardware for messaging and switching. As I am part of an active project called Kivy.org I figured it would be a perfect tool to provide the user interface for the console. Kivy is a cross platform development framework based on Python and the C language. For further reading I urge you to click over to the http://kivy.org website and download a copy of the framework for your operating system and run some examples. The API is ultra easy and very feature rich including 2D & 3D support with standard widget libraries and Advanced multitouch gesturing support that all runs in the hardware GPU on Microsoft windows operating systems, Apple Mac OSx operating systems including IOS for the iPhone and iPad as well as support for Android and Linux. The example code I am providing in this blog entry can easily be built simultaneously for each of the above operating systems. See http://mctouch.wordpress.com
Dependencies: EthernetNetIf TextLCD mbed
MQTTClient.cpp
00001 #include "MQTTClient.h" 00002 00003 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) 00004 { 00005 this->port = port; 00006 callback_server = callback; 00007 serverIp = server; 00008 connected = false; 00009 sessionOpened = false; 00010 timer.start(); 00011 } 00012 00013 MQTTClient::~MQTTClient() 00014 { 00015 } 00016 00017 int MQTTClient::send_data(const char* msg, int size) 00018 { 00019 int transLen = pTCPSocket->send(msg, size); 00020 00021 /*Check send length matches the message length*/ 00022 if(transLen != size){ 00023 for(int i = 0; i < size; i++) { 00024 printf("%x, ", msg[i]); 00025 } 00026 printf("Error on send.\r\n"); 00027 return -1; 00028 } 00029 return 1; 00030 } 00031 00032 int MQTTClient::open_session(char* id) 00033 { 00034 /*variable header*/ 00035 char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)}; 00036 00037 /*fixed header: 2 bytes, big endian*/ 00038 char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2}; 00039 00040 char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)]; 00041 00042 memset(packet,0,sizeof(packet)); 00043 memcpy(packet,fixed_header,sizeof(fixed_header)); 00044 memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header)); 00045 memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id)); 00046 00047 /* 00048 for(int i = 0; i < sizeof(packet); i++) { 00049 printf("%x, ", packet[i]); 00050 } 00051 printf("\r\n");*/ 00052 00053 if(!send_data(packet, sizeof(packet))) 00054 { 00055 return -1; 00056 } 00057 return 1; 00058 } 00059 00060 int MQTTClient::connect(char* id) 00061 { 00062 clientId = id; 00063 00064 /*Initial TCP socket*/ 00065 pTCPSocket = new TCPSocket; 00066 pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent); 00067 00068 host.setPort(1883); 00069 host.setIp(serverIp); 00070 host.setName("localhost"); 00071 00072 /*Trying to connect to host*/ 00073 printf("Trying to connect to host..\r\n\r\n"); 00074 TCPSocketErr err = pTCPSocket->connect(host); 00075 00076 Net::poll(); 00077 if(err) 00078 { 00079 printf("Error connecting to host [%d]\r\n", (int) err); 00080 return -1; 00081 } 00082 printf("Connect to host sucessed..\r\n\r\n"); 00083 00084 /*Wait TCP connection with server to be established*/ 00085 int i = 0; 00086 while(!connected) 00087 { 00088 Net::poll(); 00089 wait(1); 00090 i++; 00091 printf("Wait for connections %d..\r\n", i); 00092 if(i == 35) 00093 {//If wait too long, give up. 00094 return -1; 00095 } 00096 } 00097 00098 /*Send open session message to server*/ 00099 open_session(id); 00100 00101 /*Wait server notice of open sesion*/ 00102 while(!sessionOpened) 00103 { 00104 Net::poll(); 00105 wait(1); 00106 if(!connected){ 00107 break; 00108 } 00109 printf("Wait for session..\r\n"); 00110 } 00111 if(!connected) 00112 { 00113 return -2; 00114 } 00115 lastActivity = timer.read_ms(); 00116 return 1; 00117 } 00118 00119 int MQTTClient::publish(char* pub_topic, char* msg) 00120 { 00121 uint8_t var_header_pub[strlen(pub_topic)+3]; 00122 strcpy((char *)&var_header_pub[2], pub_topic); 00123 var_header_pub[0] = 0; 00124 var_header_pub[1] = strlen(pub_topic); 00125 var_header_pub[sizeof(var_header_pub)-1] = 0; 00126 00127 uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)}; 00128 00129 uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)]; 00130 memset(packet_pub,0,sizeof(packet_pub)); 00131 memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub)); 00132 memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub)); 00133 memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg)); 00134 00135 if(!send_data((char*)packet_pub, sizeof(packet_pub))) 00136 { 00137 return -1; 00138 } 00139 return 1; 00140 } 00141 00142 00143 void MQTTClient::disconnect() 00144 { 00145 char packet_224[] = {0xe0, 0x00}; 00146 send_data((char*)packet_224, 2); 00147 00148 connected = false; 00149 } 00150 00151 void MQTTClient::read_data() 00152 { 00153 char buffer[1024]; 00154 int len = 0, readLen; 00155 00156 while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){ 00157 len += readLen; 00158 } 00159 00160 buffer[len] = '\0'; 00161 00162 /* 00163 printf("Read length: %d %d\r\n", len, readLen); 00164 00165 for(int i = 0; i < len; i++) { 00166 printf("%x\r\n", buffer[i],buffer[i]); 00167 } 00168 printf("\r\n"); 00169 */ 00170 char type = buffer[0]>>4; 00171 if (type == 3) { // PUBLISH 00172 if (callback_server) { 00173 uint8_t tl = (buffer[2]<<3)+buffer[3]; 00174 char topic[tl+1]; 00175 for (int i=0;i<tl;i++) { 00176 topic[i] = buffer[4+i]; 00177 } 00178 topic[tl] = 0; 00179 // ignore msgID - only support QoS 0 subs 00180 char *payload = buffer+4+tl; 00181 callback_server(topic,(char*)payload); 00182 } 00183 } else if (type == 12) { // PINGREG -- Ask for alive 00184 char packet_208[] = {0xd0, 0x00}; 00185 send_data((char*)packet_208, 2); 00186 lastActivity = timer.read_ms(); 00187 } 00188 } 00189 00190 void MQTTClient::read_open_session() 00191 { 00192 char buffer[32]; 00193 int len = 0, readLen; 00194 00195 while((readLen = pTCPSocket->recv(buffer, 32)) != 0) 00196 { 00197 len += readLen; 00198 } 00199 00200 if(len == 4 && buffer[3] == 0) 00201 { 00202 printf("Session opened\r\n"); 00203 sessionOpened = true; 00204 } 00205 } 00206 00207 int MQTTClient::subscribe(char* topic) 00208 { 00209 00210 if (connected) 00211 { 00212 uint8_t var_header_topic[] = {0,10}; 00213 uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3}; 00214 00215 //utf topic 00216 uint8_t utf_topic[strlen(topic)+3]; 00217 strcpy((char *)&utf_topic[2], topic); 00218 00219 utf_topic[0] = 0; 00220 utf_topic[1] = strlen(topic); 00221 utf_topic[sizeof(utf_topic)-1] = 0; 00222 00223 char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3]; 00224 memset(packet_topic,0,sizeof(packet_topic)); 00225 memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic)); 00226 memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic)); 00227 memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic)); 00228 00229 if (!send_data(packet_topic, sizeof(packet_topic))) 00230 { 00231 return -1; 00232 } 00233 return 1; 00234 } 00235 return -1; 00236 } 00237 00238 void MQTTClient::live() 00239 { 00240 if (connected) 00241 { 00242 int t = timer.read_ms(); 00243 if (t - lastActivity > KEEPALIVE) { 00244 //Send 192 0 to broker 00245 printf("Send 192\r\n"); 00246 char packet_192[] = {0xc0, 0x00}; 00247 send_data((char*)packet_192, 2); 00248 lastActivity = t; 00249 } 00250 } 00251 } 00252 00253 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) 00254 { 00255 switch(e) 00256 { 00257 case TCPSOCKET_ACCEPT: 00258 printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n"); 00259 break; 00260 case TCPSOCKET_CONNECTED: 00261 printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n"); 00262 connected = true; 00263 break; 00264 case TCPSOCKET_WRITEABLE: 00265 printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n"); 00266 break; 00267 case TCPSOCKET_READABLE: 00268 printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n"); 00269 if(!sessionOpened) 00270 { 00271 read_open_session(); 00272 } 00273 read_data(); 00274 break; 00275 case TCPSOCKET_CONTIMEOUT: 00276 printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n"); 00277 break; 00278 case TCPSOCKET_CONRST: 00279 printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n"); 00280 break; 00281 case TCPSOCKET_CONABRT: 00282 printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n"); 00283 break; 00284 case TCPSOCKET_ERROR: 00285 printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n"); 00286 break; 00287 case TCPSOCKET_DISCONNECTED: 00288 printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n"); 00289 pTCPSocket->close(); 00290 connected = false; 00291 break; 00292 } 00293 }
Generated on Thu Jul 14 2022 10:15:43 by 1.7.2