MQTTClient
Dependents: IoTGateway_Basic test
MQTTClient.cpp
00001 /* 00002 MQTTClient.cpp 00003 Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20 00004 A simple MQTT client for mbed, version 2.0 00005 By Yilun FAN, @CEIT, @JAN 2011 00006 00007 Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk) 00008 00009 Permission is hereby granted, free of charge, to any person obtaining a copy 00010 of this software and associated documentation files (the "Software"), to deal 00011 in the Software without restriction, including without limitation the rights 00012 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00013 copies of the Software, and to permit persons to whom the Software is 00014 furnished to do so, subject to the following conditions: 00015 00016 The above copyright notice and this permission notice shall be included in 00017 all copies or substantial portions of the Software. 00018 00019 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00020 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00021 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00022 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00023 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00024 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00025 THE SOFTWARE. 00026 */ 00027 00028 #include "MQTTClient.h" 00029 00030 /** Default Constructor 00031 */ 00032 MQTTClient::MQTTClient() {} 00033 00034 /** Default Destructor 00035 */ 00036 MQTTClient::~MQTTClient() {} 00037 00038 /** Alternative Constructor with parameters 00039 * 00040 * Allow object to be constructed with minimum parameters. 00041 * 00042 * @param server The IP address of the server to connect to 00043 * @param port The TCP/IP port on the server to connect to 00044 * @param callback Callback function to handle subscription to topics 00045 */ 00046 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) { 00047 this->port = port; 00048 callback_server = callback; 00049 serverIp = server; 00050 this->userName = NULL; 00051 this->password = NULL; 00052 connected = false; 00053 sessionOpened = false; 00054 timer.start(); 00055 } 00056 00057 00058 /** MQTT initialisation method 00059 * 00060 * Used when default constructor used and need to specify parameters at runtime 00061 * 00062 * @param server The IP address of the server to connect to 00063 * @param port The TCP/IP port on the server to connect to 00064 * @param callback Callback function to handle subscription to topics 00065 */ 00066 void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) { 00067 this->port = port; 00068 callback_server = callback; 00069 serverIp = *server; 00070 this->userName = NULL; 00071 this->password = NULL; 00072 connected = false; 00073 sessionOpened = false; 00074 timer.start(); 00075 } 00076 00077 /** A brief description of the function foo 00078 * 00079 * More details about the function goes here 00080 * and here 00081 * 00082 * @param server The IP address of the server to connect to 00083 * @param port The TCP/IP port on the server to connect to 00084 * @param userName Pointer to username string, zero terminated. Null for not used 00085 * @param password Pointer to password string, zero terminated. Null for not used 00086 * @param callback Callback function to handle subscription to topics 00087 */ 00088 void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) { 00089 this->port = port; 00090 callback_server = callback; 00091 serverIp = *server; 00092 this->userName = userName; 00093 this->password = password; 00094 connected = false; 00095 sessionOpened = false; 00096 timer.start(); 00097 } 00098 00099 /** Send a message of specified size 00100 * 00101 * @param msg message string 00102 * @param size Size of the message to send 00103 * @returns value to indicate message sent successfully or not, -1 for error, 1 for success. 00104 */ 00105 int MQTTClient::send_data(const char* msg, int size) { 00106 int transLen = pTCPSocket->send(msg, size); 00107 00108 /*Check send length matches the message length*/ 00109 if (transLen != size) { 00110 for (int i = 0; i < size; i++) { 00111 printf("%x, ", msg[i]); 00112 } 00113 printf("Error on send.\r\n"); 00114 return -1; 00115 } 00116 return 1; 00117 } 00118 00119 /** Start a MQTT session, build CONNECT packet 00120 * 00121 * @param id The client name shown on MQTT server. 00122 * @returns -1 for error, 1 for success 00123 */ 00124 int MQTTClient::open_session(char* id) { 00125 /*variable header*/ 00126 char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)}; 00127 00128 /*fixed header: 2 bytes, big endian*/ 00129 char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2}; 00130 short usernameLen = strlen( userName ); 00131 short passwordLen = strlen( password ); 00132 var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 ); 00133 var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 ); 00134 00135 // printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen ); 00136 char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + 6 + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ]; 00137 00138 memset(packet,0,sizeof(packet)); 00139 memcpy(packet,fixed_header,sizeof(fixed_header)); 00140 memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header)); 00141 memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id)); 00142 if ( usernameLen > 0 ) { 00143 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00; 00144 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen; 00145 memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen); 00146 packet[1] += (usernameLen + 2); 00147 } 00148 if ( passwordLen > 0 ) { 00149 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00; 00150 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen; 00151 memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen); 00152 packet[1] += (passwordLen + 2); 00153 } 00154 00155 /* printf("Packet length %d\n", sizeof(packet)); 00156 for (int i = 0; i < sizeof(packet); i++) { 00157 printf("%x, ", packet[i]); 00158 } 00159 printf("\r\n"); 00160 */ 00161 if (!send_data(packet, sizeof(packet))) { 00162 return -1; 00163 } 00164 // printf("Sent\n"); 00165 return 1; 00166 } 00167 00168 /** Open TCP port, connect to server on given IP address. 00169 * 00170 * @param id The client name shown on MQTT server. 00171 * @returns -1: If connect to server failed. -2: Failed to open session on server. 1: Connection accessed. 00172 */ 00173 int MQTTClient::connect(char* id) { 00174 clientId = id; 00175 00176 /*Initial TCP socket*/ 00177 pTCPSocket = new TCPSocket; 00178 pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent); 00179 00180 host.setPort(port); 00181 host.setIp(serverIp); 00182 host.setName("localhost"); 00183 00184 /*Trying to connect to host*/ 00185 printf("Trying to connect to host..\r\n\r\n"); 00186 TCPSocketErr err = pTCPSocket->connect(host); 00187 00188 Net::poll(); 00189 if (err) { 00190 printf("Error connecting to host [%d]\r\n", (int) err); 00191 return -1; 00192 } 00193 printf("Connect to host..\r\n\r\n"); 00194 00195 /*Wait TCP connection with server to be established*/ 00196 int i = 0; 00197 while (!connected) { 00198 Net::poll(); 00199 wait(1); 00200 i++; 00201 printf("Wait for connections %d..\r\n", i); 00202 if (i == 35) {//If wait too long, give up. 00203 return -1; 00204 } 00205 } 00206 00207 /*Send open session message to server*/ 00208 open_session(id); 00209 00210 /*Wait server notice of open sesion*/ 00211 i = 0; 00212 while (!sessionOpened) { 00213 Net::poll(); 00214 wait(1); 00215 if (!connected || ++i >40) { 00216 break; 00217 } 00218 printf("Wait for session %d..\r\n", i); 00219 } 00220 if (!connected) { 00221 return -2; 00222 } 00223 lastActivity = timer.read_ms(); 00224 return 1; 00225 } 00226 00227 /** Publish a message on a topic. 00228 * 00229 * @param pub_topic The topic name the massage will be publish on. 00230 * @param msg The massage to be published. 00231 * @returns -1: Failed to publish message. 1: Publish sucessed. 00232 */ 00233 int MQTTClient::publish(char* pub_topic, char* msg) { 00234 uint8_t var_header_pub[strlen(pub_topic)+3]; 00235 strcpy((char *)&var_header_pub[2], pub_topic); 00236 var_header_pub[0] = 0; 00237 var_header_pub[1] = strlen(pub_topic); 00238 var_header_pub[sizeof(var_header_pub)-1] = 0; 00239 00240 uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)}; 00241 00242 uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)]; 00243 memset(packet_pub,0,sizeof(packet_pub)); 00244 memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub)); 00245 memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub)); 00246 memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg)); 00247 00248 if (!send_data((char*)packet_pub, sizeof(packet_pub))) { 00249 return -1; 00250 } 00251 return 1; 00252 } 00253 00254 /** Disconnect from server 00255 */ 00256 void MQTTClient::disconnect() { 00257 char packet_224[] = {0xe0, 0x00}; 00258 send_data((char*)packet_224, 2); 00259 00260 connected = false; 00261 } 00262 00263 /** Read data from receive packet 00264 * Determine what needs to be done with packet. 00265 */ 00266 void MQTTClient::read_data() { 00267 char buffer[1024]; 00268 int len = 0, readLen; 00269 00270 while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) { 00271 len += readLen; 00272 } 00273 00274 buffer[len] = '\0'; 00275 00276 00277 printf("Read length: %d %d\r\n", len, readLen); 00278 00279 for (int i = 0; i < len; i++) { 00280 printf("%2X ", buffer[i]); 00281 } 00282 printf("\r\n"); 00283 00284 char type = buffer[0]>>4; 00285 if ( type == 2 ) { // CONNACK 00286 printf("CONNACK\n"); 00287 00288 } else if (type == 3) { // PUBLISH 00289 if (callback_server) { 00290 short index = 1; 00291 short multiplier = 1; 00292 short value = 0; 00293 uint8_t digit; 00294 do { 00295 digit = buffer[index++]; 00296 value += (digit & 127) * multiplier; 00297 multiplier *= 128; 00298 } while ((digit & 128) != 0); 00299 printf( "variable length %d, index %d\n", value, index ); 00300 00301 // uint8_t tl = (buffer[2]<<3)+buffer[3]; 00302 uint8_t tl = (buffer[index]<<3)+buffer[index+1]; 00303 printf("Topic len %d\n",tl); 00304 char topic[tl+1]; 00305 for (int i=0; i<tl; i++) { 00306 topic[i] = buffer[index+2+i]; 00307 } 00308 topic[tl] = 0; 00309 // ignore msgID - only support QoS 0 subs 00310 char *payload = buffer+index+2+tl; 00311 callback_server(topic,(char*)payload); 00312 } 00313 } else if (type == 12) { // PINGREG -- Ask for alive 00314 char packet_208[] = {0xd0, 0x00}; 00315 send_data((char*)packet_208, 2); 00316 lastActivity = timer.read_ms(); 00317 } 00318 } 00319 00320 /** Check for session opened 00321 */ 00322 void MQTTClient::read_open_session() { 00323 char buffer[32]; 00324 int len = 0, readLen; 00325 00326 while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) { 00327 len += readLen; 00328 } 00329 00330 if (len == 4 && buffer[3] == 0) { 00331 printf("Session opened\r\n"); 00332 sessionOpened = true; 00333 } 00334 } 00335 00336 /** Subscribe to a topic 00337 * 00338 * @param topic The topic name to be subscribed. 00339 * @returns -1: Failed to subscribe to topic. 1: Subscribe sucessed. 00340 */ 00341 int MQTTClient::subscribe(char* topic) { 00342 00343 if (connected) { 00344 uint8_t var_header_topic[] = {0,10}; 00345 uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3}; 00346 00347 //utf topic 00348 uint8_t utf_topic[strlen(topic)+3]; 00349 strcpy((char *)&utf_topic[2], topic); 00350 00351 utf_topic[0] = 0; 00352 utf_topic[1] = strlen(topic); 00353 utf_topic[sizeof(utf_topic)-1] = 0; 00354 00355 char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3]; 00356 memset(packet_topic,0,sizeof(packet_topic)); 00357 memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic)); 00358 memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic)); 00359 memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic)); 00360 00361 if (!send_data(packet_topic, sizeof(packet_topic))) { 00362 return -1; 00363 } 00364 return 1; 00365 } 00366 return -1; 00367 } 00368 00369 /** Send heartbeat/keep-alive message 00370 * 00371 */ 00372 void MQTTClient::live() { 00373 if (connected) { 00374 int t = timer.read_ms(); 00375 if (t - lastActivity > KEEPALIVE) { 00376 //Send 192 0 to broker 00377 printf("Send 192\r\n"); 00378 char packet_192[] = {0xc0, 0x00}; 00379 send_data((char*)packet_192, 2); 00380 lastActivity = t; 00381 } 00382 } 00383 } 00384 00385 /** TCP Socket event handling 00386 * 00387 * @param e Event object 00388 */ 00389 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) { 00390 switch (e) { 00391 case TCPSOCKET_ACCEPT: 00392 printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n"); 00393 break; 00394 case TCPSOCKET_CONNECTED: 00395 printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n"); 00396 connected = true; 00397 break; 00398 case TCPSOCKET_WRITEABLE: 00399 printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n"); 00400 break; 00401 case TCPSOCKET_READABLE: 00402 printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n"); 00403 if (!sessionOpened) { 00404 read_open_session(); 00405 } 00406 read_data(); 00407 break; 00408 case TCPSOCKET_CONTIMEOUT: 00409 printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n"); 00410 break; 00411 case TCPSOCKET_CONRST: 00412 printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n"); 00413 break; 00414 case TCPSOCKET_CONABRT: 00415 printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n"); 00416 break; 00417 case TCPSOCKET_ERROR: 00418 printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n"); 00419 break; 00420 case TCPSOCKET_DISCONNECTED: 00421 printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n"); 00422 pTCPSocket->close(); 00423 connected = false; 00424 break; 00425 } 00426 }
Generated on Fri Jul 15 2022 10:44:02 by 1.7.2