MQTTClient

Dependents:   IoTGateway_Basic test

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File

MQTTClient.cpp

00001 /*
00002 MQTTClient.cpp
00003 Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20
00004 A simple MQTT client for mbed, version 2.0
00005 By Yilun FAN, @CEIT, @JAN 2011
00006 
00007 Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
00008 
00009 Permission is hereby granted, free of charge, to any person obtaining a copy
00010 of this software and associated documentation files (the "Software"), to deal
00011 in the Software without restriction, including without limitation the rights
00012 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00013 copies of the Software, and to permit persons to whom the Software is
00014 furnished to do so, subject to the following conditions:
00015 
00016 The above copyright notice and this permission notice shall be included in
00017 all copies or substantial portions of the Software.
00018 
00019 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00020 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00021 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00022 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00023 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00024 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00025 THE SOFTWARE.
00026 */
00027 
00028 #include "MQTTClient.h"
00029 
00030 /** Default Constructor
00031  */
00032 MQTTClient::MQTTClient() {}
00033 
00034 /** Default Destructor
00035  */
00036 MQTTClient::~MQTTClient() {}
00037 
00038 /** Alternative Constructor with parameters
00039  *
00040  * Allow object to be constructed with minimum parameters.
00041  *
00042  * @param server The IP address of the server to connect to
00043  * @param port   The TCP/IP port on the server to connect to
00044  * @param callback Callback function to handle subscription to topics
00045  */
00046 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) {
00047     this->port = port;
00048     callback_server = callback;
00049     serverIp = server;
00050     this->userName = NULL;
00051     this->password = NULL;
00052     connected = false;
00053     sessionOpened = false;
00054     timer.start();
00055 }
00056 
00057 
00058 /** MQTT initialisation method
00059  *
00060  * Used when default constructor used and need to specify parameters at runtime
00061  *
00062  * @param server The IP address of the server to connect to
00063  * @param port   The TCP/IP port on the server to connect to
00064  * @param callback Callback function to handle subscription to topics
00065  */
00066 void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) {
00067     this->port = port;
00068     callback_server = callback;
00069     serverIp = *server;
00070     this->userName = NULL;
00071     this->password = NULL;
00072     connected = false;
00073     sessionOpened = false;
00074     timer.start();
00075 }
00076 
00077 /** A brief description of the function foo
00078  *
00079  * More details about the function goes here
00080  * and here
00081  *
00082  * @param server The IP address of the server to connect to
00083  * @param port   The TCP/IP port on the server to connect to
00084  * @param userName Pointer to username string, zero terminated. Null for not used
00085  * @param password Pointer to password string, zero terminated. Null for not used
00086  * @param callback Callback function to handle subscription to topics
00087  */
00088 void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) {
00089     this->port = port;
00090     callback_server = callback;
00091     serverIp = *server;
00092     this->userName = userName;
00093     this->password = password;
00094     connected = false;
00095     sessionOpened = false;
00096     timer.start();
00097 }
00098 
00099 /** Send a message of specified size
00100  *
00101  * @param msg  message string
00102  * @param size Size of the message to send
00103  * @returns value to indicate message sent successfully or not, -1 for error, 1 for success.
00104  */
00105 int MQTTClient::send_data(const char* msg, int size) {
00106     int transLen = pTCPSocket->send(msg, size);
00107 
00108     /*Check send length matches the message length*/
00109     if (transLen != size) {
00110         for (int i = 0; i < size; i++) {
00111             printf("%x, ", msg[i]);
00112         }
00113         printf("Error on send.\r\n");
00114         return -1;
00115     }
00116     return 1;
00117 }
00118 
00119 /** Start a MQTT session, build CONNECT packet
00120  *
00121  * @param id The client name shown on MQTT server.
00122  * @returns -1 for error, 1 for success
00123  */
00124 int MQTTClient::open_session(char* id) {
00125     /*variable header*/
00126     char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
00127 
00128     /*fixed header: 2 bytes, big endian*/
00129     char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
00130     short usernameLen = strlen( userName );
00131     short passwordLen = strlen( password );
00132     var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 );
00133     var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 );
00134 
00135 //    printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen );
00136     char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + 6 + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
00137 
00138     memset(packet,0,sizeof(packet));
00139     memcpy(packet,fixed_header,sizeof(fixed_header));
00140     memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
00141     memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
00142     if ( usernameLen > 0 ) {
00143         packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00;
00144         packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen;
00145         memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen);
00146         packet[1] += (usernameLen + 2);
00147     }
00148     if ( passwordLen > 0 ) {
00149         packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00;
00150         packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen;
00151         memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen);
00152         packet[1] += (passwordLen + 2);
00153     }
00154 
00155     /*    printf("Packet length %d\n", sizeof(packet));
00156         for (int i = 0; i < sizeof(packet); i++) {
00157             printf("%x, ", packet[i]);
00158         }
00159         printf("\r\n");
00160     */
00161     if (!send_data(packet, sizeof(packet))) {
00162         return -1;
00163     }
00164 //    printf("Sent\n");
00165     return 1;
00166 }
00167 
00168 /** Open TCP port, connect to server on given IP address.
00169  *
00170  * @param id The client name shown on MQTT server.
00171  * @returns -1: If connect to server failed. -2: Failed to open session on server.  1: Connection accessed.
00172  */
00173 int MQTTClient::connect(char* id) {
00174     clientId = id;
00175 
00176     /*Initial TCP socket*/
00177     pTCPSocket = new TCPSocket;
00178     pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
00179 
00180     host.setPort(port);
00181     host.setIp(serverIp);
00182     host.setName("localhost");
00183 
00184     /*Trying to connect to host*/
00185     printf("Trying to connect to host..\r\n\r\n");
00186     TCPSocketErr err = pTCPSocket->connect(host);
00187 
00188     Net::poll();
00189     if (err) {
00190         printf("Error connecting to host [%d]\r\n", (int) err);
00191         return -1;
00192     }
00193     printf("Connect to host..\r\n\r\n");
00194 
00195     /*Wait TCP connection with server to be established*/
00196     int i = 0;
00197     while (!connected) {
00198         Net::poll();
00199         wait(1);
00200         i++;
00201         printf("Wait for connections %d..\r\n", i);
00202         if (i == 35) {//If wait too long, give up.
00203             return -1;
00204         }
00205     }
00206 
00207     /*Send open session message to server*/
00208     open_session(id);
00209 
00210     /*Wait server notice of open sesion*/
00211     i = 0;
00212     while (!sessionOpened) {
00213         Net::poll();
00214         wait(1);
00215         if (!connected || ++i >40) {
00216             break;
00217         }
00218         printf("Wait for session %d..\r\n", i);
00219     }
00220     if (!connected) {
00221         return -2;
00222     }
00223     lastActivity = timer.read_ms();
00224     return 1;
00225 }
00226 
00227 /** Publish a message on a topic.
00228  *
00229  * @param pub_topic  The topic name the massage will be publish on.
00230  * @param msg        The massage to be published.
00231  * @returns  -1: Failed to publish message. 1: Publish sucessed.
00232  */
00233 int MQTTClient::publish(char* pub_topic, char* msg) {
00234     uint8_t var_header_pub[strlen(pub_topic)+3];
00235     strcpy((char *)&var_header_pub[2], pub_topic);
00236     var_header_pub[0] = 0;
00237     var_header_pub[1] = strlen(pub_topic);
00238     var_header_pub[sizeof(var_header_pub)-1] = 0;
00239 
00240     uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
00241 
00242     uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
00243     memset(packet_pub,0,sizeof(packet_pub));
00244     memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
00245     memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
00246     memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
00247 
00248     if (!send_data((char*)packet_pub, sizeof(packet_pub))) {
00249         return -1;
00250     }
00251     return 1;
00252 }
00253 
00254 /** Disconnect from server
00255  */
00256 void MQTTClient::disconnect() {
00257     char packet_224[] = {0xe0, 0x00};
00258     send_data((char*)packet_224, 2);
00259 
00260     connected = false;
00261 }
00262 
00263 /** Read data from receive packet
00264  * Determine what needs to be done with packet.
00265  */
00266 void MQTTClient::read_data() {
00267     char buffer[1024];
00268     int len = 0, readLen;
00269 
00270     while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) {
00271         len += readLen;
00272     }
00273 
00274     buffer[len] = '\0';
00275 
00276 
00277     printf("Read length: %d %d\r\n", len, readLen);
00278 
00279     for (int i = 0; i < len; i++) {
00280         printf("%2X ", buffer[i]);
00281     }
00282     printf("\r\n");
00283 
00284     char type = buffer[0]>>4;
00285     if ( type == 2 ) { // CONNACK
00286         printf("CONNACK\n");
00287 
00288     } else if (type == 3) { // PUBLISH
00289         if (callback_server) {
00290             short index = 1;
00291             short multiplier = 1;
00292             short value = 0;
00293             uint8_t  digit;
00294             do {
00295                 digit = buffer[index++];
00296                 value += (digit & 127) * multiplier;
00297                 multiplier *= 128;
00298             } while ((digit & 128) != 0);
00299             printf( "variable length %d, index %d\n", value, index );
00300 
00301 //            uint8_t tl = (buffer[2]<<3)+buffer[3];
00302             uint8_t tl = (buffer[index]<<3)+buffer[index+1];
00303             printf("Topic len %d\n",tl);
00304             char topic[tl+1];
00305             for (int i=0; i<tl; i++) {
00306                 topic[i] = buffer[index+2+i];
00307             }
00308             topic[tl] = 0;
00309             // ignore msgID - only support QoS 0 subs
00310             char *payload = buffer+index+2+tl;
00311             callback_server(topic,(char*)payload);
00312         }
00313     } else if (type == 12) { // PINGREG -- Ask for alive
00314         char packet_208[] = {0xd0, 0x00};
00315         send_data((char*)packet_208, 2);
00316         lastActivity = timer.read_ms();
00317     }
00318 }
00319 
00320 /** Check for session opened
00321  */
00322 void MQTTClient::read_open_session() {
00323     char buffer[32];
00324     int len = 0, readLen;
00325 
00326     while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) {
00327         len += readLen;
00328     }
00329 
00330     if (len == 4 && buffer[3] == 0) {
00331         printf("Session opened\r\n");
00332         sessionOpened = true;
00333     }
00334 }
00335 
00336 /** Subscribe to a topic
00337  *
00338  * @param topic The topic name to be subscribed.
00339  * @returns  -1: Failed to subscribe to topic. 1: Subscribe sucessed.
00340  */
00341 int MQTTClient::subscribe(char* topic) {
00342 
00343     if (connected) {
00344         uint8_t var_header_topic[] = {0,10};
00345         uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
00346 
00347         //utf topic
00348         uint8_t utf_topic[strlen(topic)+3];
00349         strcpy((char *)&utf_topic[2], topic);
00350 
00351         utf_topic[0] = 0;
00352         utf_topic[1] = strlen(topic);
00353         utf_topic[sizeof(utf_topic)-1] = 0;
00354 
00355         char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
00356         memset(packet_topic,0,sizeof(packet_topic));
00357         memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
00358         memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
00359         memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
00360 
00361         if (!send_data(packet_topic, sizeof(packet_topic))) {
00362             return -1;
00363         }
00364         return 1;
00365     }
00366     return -1;
00367 }
00368 
00369 /** Send heartbeat/keep-alive message
00370  *
00371  */
00372 void MQTTClient::live() {
00373     if (connected) {
00374         int t = timer.read_ms();
00375         if (t - lastActivity > KEEPALIVE) {
00376             //Send 192 0 to broker
00377             printf("Send 192\r\n");
00378             char packet_192[] = {0xc0, 0x00};
00379             send_data((char*)packet_192, 2);
00380             lastActivity = t;
00381         }
00382     }
00383 }
00384 
00385 /** TCP Socket event handling
00386  *
00387  * @param e  Event object
00388  */
00389 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) {
00390     switch (e) {
00391         case TCPSOCKET_ACCEPT:
00392             printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
00393             break;
00394         case TCPSOCKET_CONNECTED:
00395             printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
00396             connected = true;
00397             break;
00398         case TCPSOCKET_WRITEABLE:
00399             printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
00400             break;
00401         case TCPSOCKET_READABLE:
00402             printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
00403             if (!sessionOpened) {
00404                 read_open_session();
00405             }
00406             read_data();
00407             break;
00408         case TCPSOCKET_CONTIMEOUT:
00409             printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
00410             break;
00411         case TCPSOCKET_CONRST:
00412             printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
00413             break;
00414         case TCPSOCKET_CONABRT:
00415             printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
00416             break;
00417         case TCPSOCKET_ERROR:
00418             printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
00419             break;
00420         case TCPSOCKET_DISCONNECTED:
00421             printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
00422             pTCPSocket->close();
00423             connected = false;
00424             break;
00425     }
00426 }