My goal with this project was to create a quick operations console to interface with embedded hardware for messaging and switching. As I am part of an active project called Kivy.org I figured it would be a perfect tool to provide the user interface for the console. Kivy is a cross platform development framework based on Python and the C language. For further reading I urge you to click over to the http://kivy.org website and download a copy of the framework for your operating system and run some examples. The API is ultra easy and very feature rich including 2D & 3D support with standard widget libraries and Advanced multitouch gesturing support that all runs in the hardware GPU on Microsoft windows operating systems, Apple Mac OSx operating systems including IOS for the iPhone and iPad as well as support for Android and Linux. The example code I am providing in this blog entry can easily be built simultaneously for each of the above operating systems. See http://mctouch.wordpress.com

Dependencies:   EthernetNetIf TextLCD mbed

Committer:
mctouch
Date:
Thu Jun 28 21:29:07 2012 +0000
Revision:
0:7852eddd2a3c
For blog entry

Who changed what in which revision?

UserRevisionLine numberNew contents of line
mctouch 0:7852eddd2a3c 1 #include "MQTTClient.h"
mctouch 0:7852eddd2a3c 2
mctouch 0:7852eddd2a3c 3 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*))
mctouch 0:7852eddd2a3c 4 {
mctouch 0:7852eddd2a3c 5 this->port = port;
mctouch 0:7852eddd2a3c 6 callback_server = callback;
mctouch 0:7852eddd2a3c 7 serverIp = server;
mctouch 0:7852eddd2a3c 8 connected = false;
mctouch 0:7852eddd2a3c 9 sessionOpened = false;
mctouch 0:7852eddd2a3c 10 timer.start();
mctouch 0:7852eddd2a3c 11 }
mctouch 0:7852eddd2a3c 12
mctouch 0:7852eddd2a3c 13 MQTTClient::~MQTTClient()
mctouch 0:7852eddd2a3c 14 {
mctouch 0:7852eddd2a3c 15 }
mctouch 0:7852eddd2a3c 16
mctouch 0:7852eddd2a3c 17 int MQTTClient::send_data(const char* msg, int size)
mctouch 0:7852eddd2a3c 18 {
mctouch 0:7852eddd2a3c 19 int transLen = pTCPSocket->send(msg, size);
mctouch 0:7852eddd2a3c 20
mctouch 0:7852eddd2a3c 21 /*Check send length matches the message length*/
mctouch 0:7852eddd2a3c 22 if(transLen != size){
mctouch 0:7852eddd2a3c 23 for(int i = 0; i < size; i++) {
mctouch 0:7852eddd2a3c 24 printf("%x, ", msg[i]);
mctouch 0:7852eddd2a3c 25 }
mctouch 0:7852eddd2a3c 26 printf("Error on send.\r\n");
mctouch 0:7852eddd2a3c 27 return -1;
mctouch 0:7852eddd2a3c 28 }
mctouch 0:7852eddd2a3c 29 return 1;
mctouch 0:7852eddd2a3c 30 }
mctouch 0:7852eddd2a3c 31
mctouch 0:7852eddd2a3c 32 int MQTTClient::open_session(char* id)
mctouch 0:7852eddd2a3c 33 {
mctouch 0:7852eddd2a3c 34 /*variable header*/
mctouch 0:7852eddd2a3c 35 char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
mctouch 0:7852eddd2a3c 36
mctouch 0:7852eddd2a3c 37 /*fixed header: 2 bytes, big endian*/
mctouch 0:7852eddd2a3c 38 char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
mctouch 0:7852eddd2a3c 39
mctouch 0:7852eddd2a3c 40 char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)];
mctouch 0:7852eddd2a3c 41
mctouch 0:7852eddd2a3c 42 memset(packet,0,sizeof(packet));
mctouch 0:7852eddd2a3c 43 memcpy(packet,fixed_header,sizeof(fixed_header));
mctouch 0:7852eddd2a3c 44 memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
mctouch 0:7852eddd2a3c 45 memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
mctouch 0:7852eddd2a3c 46
mctouch 0:7852eddd2a3c 47 /*
mctouch 0:7852eddd2a3c 48 for(int i = 0; i < sizeof(packet); i++) {
mctouch 0:7852eddd2a3c 49 printf("%x, ", packet[i]);
mctouch 0:7852eddd2a3c 50 }
mctouch 0:7852eddd2a3c 51 printf("\r\n");*/
mctouch 0:7852eddd2a3c 52
mctouch 0:7852eddd2a3c 53 if(!send_data(packet, sizeof(packet)))
mctouch 0:7852eddd2a3c 54 {
mctouch 0:7852eddd2a3c 55 return -1;
mctouch 0:7852eddd2a3c 56 }
mctouch 0:7852eddd2a3c 57 return 1;
mctouch 0:7852eddd2a3c 58 }
mctouch 0:7852eddd2a3c 59
mctouch 0:7852eddd2a3c 60 int MQTTClient::connect(char* id)
mctouch 0:7852eddd2a3c 61 {
mctouch 0:7852eddd2a3c 62 clientId = id;
mctouch 0:7852eddd2a3c 63
mctouch 0:7852eddd2a3c 64 /*Initial TCP socket*/
mctouch 0:7852eddd2a3c 65 pTCPSocket = new TCPSocket;
mctouch 0:7852eddd2a3c 66 pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
mctouch 0:7852eddd2a3c 67
mctouch 0:7852eddd2a3c 68 host.setPort(1883);
mctouch 0:7852eddd2a3c 69 host.setIp(serverIp);
mctouch 0:7852eddd2a3c 70 host.setName("localhost");
mctouch 0:7852eddd2a3c 71
mctouch 0:7852eddd2a3c 72 /*Trying to connect to host*/
mctouch 0:7852eddd2a3c 73 printf("Trying to connect to host..\r\n\r\n");
mctouch 0:7852eddd2a3c 74 TCPSocketErr err = pTCPSocket->connect(host);
mctouch 0:7852eddd2a3c 75
mctouch 0:7852eddd2a3c 76 Net::poll();
mctouch 0:7852eddd2a3c 77 if(err)
mctouch 0:7852eddd2a3c 78 {
mctouch 0:7852eddd2a3c 79 printf("Error connecting to host [%d]\r\n", (int) err);
mctouch 0:7852eddd2a3c 80 return -1;
mctouch 0:7852eddd2a3c 81 }
mctouch 0:7852eddd2a3c 82 printf("Connect to host sucessed..\r\n\r\n");
mctouch 0:7852eddd2a3c 83
mctouch 0:7852eddd2a3c 84 /*Wait TCP connection with server to be established*/
mctouch 0:7852eddd2a3c 85 int i = 0;
mctouch 0:7852eddd2a3c 86 while(!connected)
mctouch 0:7852eddd2a3c 87 {
mctouch 0:7852eddd2a3c 88 Net::poll();
mctouch 0:7852eddd2a3c 89 wait(1);
mctouch 0:7852eddd2a3c 90 i++;
mctouch 0:7852eddd2a3c 91 printf("Wait for connections %d..\r\n", i);
mctouch 0:7852eddd2a3c 92 if(i == 35)
mctouch 0:7852eddd2a3c 93 {//If wait too long, give up.
mctouch 0:7852eddd2a3c 94 return -1;
mctouch 0:7852eddd2a3c 95 }
mctouch 0:7852eddd2a3c 96 }
mctouch 0:7852eddd2a3c 97
mctouch 0:7852eddd2a3c 98 /*Send open session message to server*/
mctouch 0:7852eddd2a3c 99 open_session(id);
mctouch 0:7852eddd2a3c 100
mctouch 0:7852eddd2a3c 101 /*Wait server notice of open sesion*/
mctouch 0:7852eddd2a3c 102 while(!sessionOpened)
mctouch 0:7852eddd2a3c 103 {
mctouch 0:7852eddd2a3c 104 Net::poll();
mctouch 0:7852eddd2a3c 105 wait(1);
mctouch 0:7852eddd2a3c 106 if(!connected){
mctouch 0:7852eddd2a3c 107 break;
mctouch 0:7852eddd2a3c 108 }
mctouch 0:7852eddd2a3c 109 printf("Wait for session..\r\n");
mctouch 0:7852eddd2a3c 110 }
mctouch 0:7852eddd2a3c 111 if(!connected)
mctouch 0:7852eddd2a3c 112 {
mctouch 0:7852eddd2a3c 113 return -2;
mctouch 0:7852eddd2a3c 114 }
mctouch 0:7852eddd2a3c 115 lastActivity = timer.read_ms();
mctouch 0:7852eddd2a3c 116 return 1;
mctouch 0:7852eddd2a3c 117 }
mctouch 0:7852eddd2a3c 118
mctouch 0:7852eddd2a3c 119 int MQTTClient::publish(char* pub_topic, char* msg)
mctouch 0:7852eddd2a3c 120 {
mctouch 0:7852eddd2a3c 121 uint8_t var_header_pub[strlen(pub_topic)+3];
mctouch 0:7852eddd2a3c 122 strcpy((char *)&var_header_pub[2], pub_topic);
mctouch 0:7852eddd2a3c 123 var_header_pub[0] = 0;
mctouch 0:7852eddd2a3c 124 var_header_pub[1] = strlen(pub_topic);
mctouch 0:7852eddd2a3c 125 var_header_pub[sizeof(var_header_pub)-1] = 0;
mctouch 0:7852eddd2a3c 126
mctouch 0:7852eddd2a3c 127 uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
mctouch 0:7852eddd2a3c 128
mctouch 0:7852eddd2a3c 129 uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
mctouch 0:7852eddd2a3c 130 memset(packet_pub,0,sizeof(packet_pub));
mctouch 0:7852eddd2a3c 131 memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
mctouch 0:7852eddd2a3c 132 memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
mctouch 0:7852eddd2a3c 133 memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
mctouch 0:7852eddd2a3c 134
mctouch 0:7852eddd2a3c 135 if(!send_data((char*)packet_pub, sizeof(packet_pub)))
mctouch 0:7852eddd2a3c 136 {
mctouch 0:7852eddd2a3c 137 return -1;
mctouch 0:7852eddd2a3c 138 }
mctouch 0:7852eddd2a3c 139 return 1;
mctouch 0:7852eddd2a3c 140 }
mctouch 0:7852eddd2a3c 141
mctouch 0:7852eddd2a3c 142
mctouch 0:7852eddd2a3c 143 void MQTTClient::disconnect()
mctouch 0:7852eddd2a3c 144 {
mctouch 0:7852eddd2a3c 145 char packet_224[] = {0xe0, 0x00};
mctouch 0:7852eddd2a3c 146 send_data((char*)packet_224, 2);
mctouch 0:7852eddd2a3c 147
mctouch 0:7852eddd2a3c 148 connected = false;
mctouch 0:7852eddd2a3c 149 }
mctouch 0:7852eddd2a3c 150
mctouch 0:7852eddd2a3c 151 void MQTTClient::read_data()
mctouch 0:7852eddd2a3c 152 {
mctouch 0:7852eddd2a3c 153 char buffer[1024];
mctouch 0:7852eddd2a3c 154 int len = 0, readLen;
mctouch 0:7852eddd2a3c 155
mctouch 0:7852eddd2a3c 156 while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){
mctouch 0:7852eddd2a3c 157 len += readLen;
mctouch 0:7852eddd2a3c 158 }
mctouch 0:7852eddd2a3c 159
mctouch 0:7852eddd2a3c 160 buffer[len] = '\0';
mctouch 0:7852eddd2a3c 161
mctouch 0:7852eddd2a3c 162 /*
mctouch 0:7852eddd2a3c 163 printf("Read length: %d %d\r\n", len, readLen);
mctouch 0:7852eddd2a3c 164
mctouch 0:7852eddd2a3c 165 for(int i = 0; i < len; i++) {
mctouch 0:7852eddd2a3c 166 printf("%x\r\n", buffer[i],buffer[i]);
mctouch 0:7852eddd2a3c 167 }
mctouch 0:7852eddd2a3c 168 printf("\r\n");
mctouch 0:7852eddd2a3c 169 */
mctouch 0:7852eddd2a3c 170 char type = buffer[0]>>4;
mctouch 0:7852eddd2a3c 171 if (type == 3) { // PUBLISH
mctouch 0:7852eddd2a3c 172 if (callback_server) {
mctouch 0:7852eddd2a3c 173 uint8_t tl = (buffer[2]<<3)+buffer[3];
mctouch 0:7852eddd2a3c 174 char topic[tl+1];
mctouch 0:7852eddd2a3c 175 for (int i=0;i<tl;i++) {
mctouch 0:7852eddd2a3c 176 topic[i] = buffer[4+i];
mctouch 0:7852eddd2a3c 177 }
mctouch 0:7852eddd2a3c 178 topic[tl] = 0;
mctouch 0:7852eddd2a3c 179 // ignore msgID - only support QoS 0 subs
mctouch 0:7852eddd2a3c 180 char *payload = buffer+4+tl;
mctouch 0:7852eddd2a3c 181 callback_server(topic,(char*)payload);
mctouch 0:7852eddd2a3c 182 }
mctouch 0:7852eddd2a3c 183 } else if (type == 12) { // PINGREG -- Ask for alive
mctouch 0:7852eddd2a3c 184 char packet_208[] = {0xd0, 0x00};
mctouch 0:7852eddd2a3c 185 send_data((char*)packet_208, 2);
mctouch 0:7852eddd2a3c 186 lastActivity = timer.read_ms();
mctouch 0:7852eddd2a3c 187 }
mctouch 0:7852eddd2a3c 188 }
mctouch 0:7852eddd2a3c 189
mctouch 0:7852eddd2a3c 190 void MQTTClient::read_open_session()
mctouch 0:7852eddd2a3c 191 {
mctouch 0:7852eddd2a3c 192 char buffer[32];
mctouch 0:7852eddd2a3c 193 int len = 0, readLen;
mctouch 0:7852eddd2a3c 194
mctouch 0:7852eddd2a3c 195 while((readLen = pTCPSocket->recv(buffer, 32)) != 0)
mctouch 0:7852eddd2a3c 196 {
mctouch 0:7852eddd2a3c 197 len += readLen;
mctouch 0:7852eddd2a3c 198 }
mctouch 0:7852eddd2a3c 199
mctouch 0:7852eddd2a3c 200 if(len == 4 && buffer[3] == 0)
mctouch 0:7852eddd2a3c 201 {
mctouch 0:7852eddd2a3c 202 printf("Session opened\r\n");
mctouch 0:7852eddd2a3c 203 sessionOpened = true;
mctouch 0:7852eddd2a3c 204 }
mctouch 0:7852eddd2a3c 205 }
mctouch 0:7852eddd2a3c 206
mctouch 0:7852eddd2a3c 207 int MQTTClient::subscribe(char* topic)
mctouch 0:7852eddd2a3c 208 {
mctouch 0:7852eddd2a3c 209
mctouch 0:7852eddd2a3c 210 if (connected)
mctouch 0:7852eddd2a3c 211 {
mctouch 0:7852eddd2a3c 212 uint8_t var_header_topic[] = {0,10};
mctouch 0:7852eddd2a3c 213 uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
mctouch 0:7852eddd2a3c 214
mctouch 0:7852eddd2a3c 215 //utf topic
mctouch 0:7852eddd2a3c 216 uint8_t utf_topic[strlen(topic)+3];
mctouch 0:7852eddd2a3c 217 strcpy((char *)&utf_topic[2], topic);
mctouch 0:7852eddd2a3c 218
mctouch 0:7852eddd2a3c 219 utf_topic[0] = 0;
mctouch 0:7852eddd2a3c 220 utf_topic[1] = strlen(topic);
mctouch 0:7852eddd2a3c 221 utf_topic[sizeof(utf_topic)-1] = 0;
mctouch 0:7852eddd2a3c 222
mctouch 0:7852eddd2a3c 223 char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
mctouch 0:7852eddd2a3c 224 memset(packet_topic,0,sizeof(packet_topic));
mctouch 0:7852eddd2a3c 225 memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
mctouch 0:7852eddd2a3c 226 memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
mctouch 0:7852eddd2a3c 227 memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
mctouch 0:7852eddd2a3c 228
mctouch 0:7852eddd2a3c 229 if (!send_data(packet_topic, sizeof(packet_topic)))
mctouch 0:7852eddd2a3c 230 {
mctouch 0:7852eddd2a3c 231 return -1;
mctouch 0:7852eddd2a3c 232 }
mctouch 0:7852eddd2a3c 233 return 1;
mctouch 0:7852eddd2a3c 234 }
mctouch 0:7852eddd2a3c 235 return -1;
mctouch 0:7852eddd2a3c 236 }
mctouch 0:7852eddd2a3c 237
mctouch 0:7852eddd2a3c 238 void MQTTClient::live()
mctouch 0:7852eddd2a3c 239 {
mctouch 0:7852eddd2a3c 240 if (connected)
mctouch 0:7852eddd2a3c 241 {
mctouch 0:7852eddd2a3c 242 int t = timer.read_ms();
mctouch 0:7852eddd2a3c 243 if (t - lastActivity > KEEPALIVE) {
mctouch 0:7852eddd2a3c 244 //Send 192 0 to broker
mctouch 0:7852eddd2a3c 245 printf("Send 192\r\n");
mctouch 0:7852eddd2a3c 246 char packet_192[] = {0xc0, 0x00};
mctouch 0:7852eddd2a3c 247 send_data((char*)packet_192, 2);
mctouch 0:7852eddd2a3c 248 lastActivity = t;
mctouch 0:7852eddd2a3c 249 }
mctouch 0:7852eddd2a3c 250 }
mctouch 0:7852eddd2a3c 251 }
mctouch 0:7852eddd2a3c 252
mctouch 0:7852eddd2a3c 253 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e)
mctouch 0:7852eddd2a3c 254 {
mctouch 0:7852eddd2a3c 255 switch(e)
mctouch 0:7852eddd2a3c 256 {
mctouch 0:7852eddd2a3c 257 case TCPSOCKET_ACCEPT:
mctouch 0:7852eddd2a3c 258 printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
mctouch 0:7852eddd2a3c 259 break;
mctouch 0:7852eddd2a3c 260 case TCPSOCKET_CONNECTED:
mctouch 0:7852eddd2a3c 261 printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
mctouch 0:7852eddd2a3c 262 connected = true;
mctouch 0:7852eddd2a3c 263 break;
mctouch 0:7852eddd2a3c 264 case TCPSOCKET_WRITEABLE:
mctouch 0:7852eddd2a3c 265 printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
mctouch 0:7852eddd2a3c 266 break;
mctouch 0:7852eddd2a3c 267 case TCPSOCKET_READABLE:
mctouch 0:7852eddd2a3c 268 printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
mctouch 0:7852eddd2a3c 269 if(!sessionOpened)
mctouch 0:7852eddd2a3c 270 {
mctouch 0:7852eddd2a3c 271 read_open_session();
mctouch 0:7852eddd2a3c 272 }
mctouch 0:7852eddd2a3c 273 read_data();
mctouch 0:7852eddd2a3c 274 break;
mctouch 0:7852eddd2a3c 275 case TCPSOCKET_CONTIMEOUT:
mctouch 0:7852eddd2a3c 276 printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
mctouch 0:7852eddd2a3c 277 break;
mctouch 0:7852eddd2a3c 278 case TCPSOCKET_CONRST:
mctouch 0:7852eddd2a3c 279 printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
mctouch 0:7852eddd2a3c 280 break;
mctouch 0:7852eddd2a3c 281 case TCPSOCKET_CONABRT:
mctouch 0:7852eddd2a3c 282 printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
mctouch 0:7852eddd2a3c 283 break;
mctouch 0:7852eddd2a3c 284 case TCPSOCKET_ERROR:
mctouch 0:7852eddd2a3c 285 printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
mctouch 0:7852eddd2a3c 286 break;
mctouch 0:7852eddd2a3c 287 case TCPSOCKET_DISCONNECTED:
mctouch 0:7852eddd2a3c 288 printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
mctouch 0:7852eddd2a3c 289 pTCPSocket->close();
mctouch 0:7852eddd2a3c 290 connected = false;
mctouch 0:7852eddd2a3c 291 break;
mctouch 0:7852eddd2a3c 292 }
mctouch 0:7852eddd2a3c 293 }