An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
Committer:
icraggs
Date:
Mon Apr 14 18:51:52 2014 +0000
Revision:
16:91c2f9a144d4
Add subscriptions

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 16:91c2f9a144d4 1 /*******************************************************************************
icraggs 16:91c2f9a144d4 2 * Copyright (c) 2014 IBM Corp.
icraggs 16:91c2f9a144d4 3 *
icraggs 16:91c2f9a144d4 4 * All rights reserved. This program and the accompanying materials
icraggs 16:91c2f9a144d4 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 16:91c2f9a144d4 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
icraggs 16:91c2f9a144d4 7 *
icraggs 16:91c2f9a144d4 8 * The Eclipse Public License is available at
icraggs 16:91c2f9a144d4 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 16:91c2f9a144d4 10 * and the Eclipse Distribution License is available at
icraggs 16:91c2f9a144d4 11 * http://www.eclipse.org/org/documents/edl-v10.php.
icraggs 16:91c2f9a144d4 12 *
icraggs 16:91c2f9a144d4 13 * Contributors:
icraggs 16:91c2f9a144d4 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 16:91c2f9a144d4 15 *******************************************************************************/
icraggs 16:91c2f9a144d4 16
icraggs 16:91c2f9a144d4 17 #if !defined(MQTTSINGLE_H)
icraggs 16:91c2f9a144d4 18 #define MQTTSINGLE_H
icraggs 16:91c2f9a144d4 19
icraggs 16:91c2f9a144d4 20 #include "FP.h"
icraggs 16:91c2f9a144d4 21 #include "MQTTPacket.h"
icraggs 16:91c2f9a144d4 22 #include "stdio.h"
icraggs 16:91c2f9a144d4 23
icraggs 16:91c2f9a144d4 24 namespace MQTT
icraggs 16:91c2f9a144d4 25 {
icraggs 16:91c2f9a144d4 26
icraggs 16:91c2f9a144d4 27
icraggs 16:91c2f9a144d4 28 enum QoS { QOS0, QOS1, QOS2 };
icraggs 16:91c2f9a144d4 29
icraggs 16:91c2f9a144d4 30
icraggs 16:91c2f9a144d4 31 struct Message
icraggs 16:91c2f9a144d4 32 {
icraggs 16:91c2f9a144d4 33 enum QoS qos;
icraggs 16:91c2f9a144d4 34 bool retained;
icraggs 16:91c2f9a144d4 35 bool dup;
icraggs 16:91c2f9a144d4 36 unsigned short id;
icraggs 16:91c2f9a144d4 37 void *payload;
icraggs 16:91c2f9a144d4 38 size_t payloadlen;
icraggs 16:91c2f9a144d4 39 };
icraggs 16:91c2f9a144d4 40
icraggs 16:91c2f9a144d4 41
icraggs 16:91c2f9a144d4 42 class PacketId
icraggs 16:91c2f9a144d4 43 {
icraggs 16:91c2f9a144d4 44 public:
icraggs 16:91c2f9a144d4 45 PacketId();
icraggs 16:91c2f9a144d4 46
icraggs 16:91c2f9a144d4 47 int getNext();
icraggs 16:91c2f9a144d4 48
icraggs 16:91c2f9a144d4 49 private:
icraggs 16:91c2f9a144d4 50 static const int MAX_PACKET_ID = 65535;
icraggs 16:91c2f9a144d4 51 int next;
icraggs 16:91c2f9a144d4 52 };
icraggs 16:91c2f9a144d4 53
icraggs 16:91c2f9a144d4 54 typedef void (*messageHandler)(Message*);
icraggs 16:91c2f9a144d4 55
icraggs 16:91c2f9a144d4 56 typedef struct limits
icraggs 16:91c2f9a144d4 57 {
icraggs 16:91c2f9a144d4 58 int MAX_MQTT_PACKET_SIZE; //
icraggs 16:91c2f9a144d4 59 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler
icraggs 16:91c2f9a144d4 60 int command_timeout;
icraggs 16:91c2f9a144d4 61
icraggs 16:91c2f9a144d4 62 limits()
icraggs 16:91c2f9a144d4 63 {
icraggs 16:91c2f9a144d4 64 MAX_MQTT_PACKET_SIZE = 100;
icraggs 16:91c2f9a144d4 65 MAX_MESSAGE_HANDLERS = 5;
icraggs 16:91c2f9a144d4 66 command_timeout = 30;
icraggs 16:91c2f9a144d4 67 }
icraggs 16:91c2f9a144d4 68 } Limits;
icraggs 16:91c2f9a144d4 69
icraggs 16:91c2f9a144d4 70
icraggs 16:91c2f9a144d4 71 template<class Network, class Timer> class Client
icraggs 16:91c2f9a144d4 72 {
icraggs 16:91c2f9a144d4 73
icraggs 16:91c2f9a144d4 74 public:
icraggs 16:91c2f9a144d4 75
icraggs 16:91c2f9a144d4 76 Client(Network* network, Limits* limits = 0);
icraggs 16:91c2f9a144d4 77
icraggs 16:91c2f9a144d4 78 int connect(MQTTPacket_connectData* options = 0);
icraggs 16:91c2f9a144d4 79
icraggs 16:91c2f9a144d4 80 int publish(const char* topic, Message* message);
icraggs 16:91c2f9a144d4 81
icraggs 16:91c2f9a144d4 82 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
icraggs 16:91c2f9a144d4 83
icraggs 16:91c2f9a144d4 84 int unsubscribe(const char* topicFilter);
icraggs 16:91c2f9a144d4 85
icraggs 16:91c2f9a144d4 86 int disconnect(int timeout);
icraggs 16:91c2f9a144d4 87
icraggs 16:91c2f9a144d4 88 private:
icraggs 16:91c2f9a144d4 89
icraggs 16:91c2f9a144d4 90 int cycle(int timeout);
icraggs 16:91c2f9a144d4 91 int keepalive();
icraggs 16:91c2f9a144d4 92
icraggs 16:91c2f9a144d4 93 int decodePacket(int* value, int timeout);
icraggs 16:91c2f9a144d4 94 int readPacket(int timeout = -1);
icraggs 16:91c2f9a144d4 95 int sendPacket(int length, int timeout = -1);
icraggs 16:91c2f9a144d4 96 int deliverMessage(MQTTString* topic, Message* message);
icraggs 16:91c2f9a144d4 97
icraggs 16:91c2f9a144d4 98 Network* ipstack;
icraggs 16:91c2f9a144d4 99 Timer command_timer, ping_timer;
icraggs 16:91c2f9a144d4 100
icraggs 16:91c2f9a144d4 101 char* buf;
icraggs 16:91c2f9a144d4 102 int buflen;
icraggs 16:91c2f9a144d4 103
icraggs 16:91c2f9a144d4 104 char* readbuf;
icraggs 16:91c2f9a144d4 105 int readbuflen;
icraggs 16:91c2f9a144d4 106
icraggs 16:91c2f9a144d4 107 unsigned int keepAliveInterval;
icraggs 16:91c2f9a144d4 108 bool ping_outstanding;
icraggs 16:91c2f9a144d4 109
icraggs 16:91c2f9a144d4 110 int command_timeout; // max time to wait for any MQTT command to complete, in seconds
icraggs 16:91c2f9a144d4 111 PacketId packetid;
icraggs 16:91c2f9a144d4 112
icraggs 16:91c2f9a144d4 113 typedef FP<void, Message*> messageHandlerFP;
icraggs 16:91c2f9a144d4 114 struct MessageHandlers
icraggs 16:91c2f9a144d4 115 {
icraggs 16:91c2f9a144d4 116 char* topic;
icraggs 16:91c2f9a144d4 117 messageHandlerFP fp;
icraggs 16:91c2f9a144d4 118 } *messageHandlers; // Message handlers are linked to a subscription topic
icraggs 16:91c2f9a144d4 119 int messageHandlerCount;
icraggs 16:91c2f9a144d4 120
icraggs 16:91c2f9a144d4 121 };
icraggs 16:91c2f9a144d4 122
icraggs 16:91c2f9a144d4 123 }
icraggs 16:91c2f9a144d4 124
icraggs 16:91c2f9a144d4 125 template<class Network, class Timer, class Thread, class Mutex> MQTT::Client<Network, Timer, Thread, Mutex>::Client(Network* network, Limits* limits) : packetid()
icraggs 16:91c2f9a144d4 126 {
icraggs 16:91c2f9a144d4 127 Limits default_limits = Limits();
icraggs 16:91c2f9a144d4 128
icraggs 16:91c2f9a144d4 129 if (limits == 0)
icraggs 16:91c2f9a144d4 130 limits = &default_limits;
icraggs 16:91c2f9a144d4 131
icraggs 16:91c2f9a144d4 132 this->command_timeout = limits->command_timeout;
icraggs 16:91c2f9a144d4 133 this->ipstack = network;
icraggs 16:91c2f9a144d4 134 this->command_timer = Timer();
icraggs 16:91c2f9a144d4 135 this->ping_timer = Timer();
icraggs 16:91c2f9a144d4 136 this->ping_outstanding = 0;
icraggs 16:91c2f9a144d4 137
icraggs 16:91c2f9a144d4 138 // How to make these memory allocations portable? I was hoping to avoid the heap
icraggs 16:91c2f9a144d4 139 buflen = readbuflen = limits->MAX_MQTT_PACKET_SIZE;
icraggs 16:91c2f9a144d4 140 buf = new char[limits->MAX_MQTT_PACKET_SIZE];
icraggs 16:91c2f9a144d4 141 readbuf = new char[limits->MAX_MQTT_PACKET_SIZE];
icraggs 16:91c2f9a144d4 142 this->messageHandlers = new struct MessageHandlers[limits->MAX_MESSAGE_HANDLERS];
icraggs 16:91c2f9a144d4 143 }
icraggs 16:91c2f9a144d4 144
icraggs 16:91c2f9a144d4 145
icraggs 16:91c2f9a144d4 146 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
icraggs 16:91c2f9a144d4 147 {
icraggs 16:91c2f9a144d4 148 int sent = 0;
icraggs 16:91c2f9a144d4 149
icraggs 16:91c2f9a144d4 150 while (sent < length)
icraggs 16:91c2f9a144d4 151 sent += ipstack->write(&buf[sent], length, -1);
icraggs 16:91c2f9a144d4 152 if (sent == length)
icraggs 16:91c2f9a144d4 153 ping_timer.reset(); // record the fact that we have successfully sent the packet
icraggs 16:91c2f9a144d4 154 return sent;
icraggs 16:91c2f9a144d4 155 }
icraggs 16:91c2f9a144d4 156
icraggs 16:91c2f9a144d4 157
icraggs 16:91c2f9a144d4 158 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
icraggs 16:91c2f9a144d4 159 {
icraggs 16:91c2f9a144d4 160 char c;
icraggs 16:91c2f9a144d4 161 int multiplier = 1;
icraggs 16:91c2f9a144d4 162 int len = 0;
icraggs 16:91c2f9a144d4 163 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
icraggs 16:91c2f9a144d4 164
icraggs 16:91c2f9a144d4 165 *value = 0;
icraggs 16:91c2f9a144d4 166 do
icraggs 16:91c2f9a144d4 167 {
icraggs 16:91c2f9a144d4 168 int rc = MQTTPACKET_READ_ERROR;
icraggs 16:91c2f9a144d4 169
icraggs 16:91c2f9a144d4 170 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 16:91c2f9a144d4 171 {
icraggs 16:91c2f9a144d4 172 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 16:91c2f9a144d4 173 goto exit;
icraggs 16:91c2f9a144d4 174 }
icraggs 16:91c2f9a144d4 175 rc = ipstack->read(&c, 1, timeout);
icraggs 16:91c2f9a144d4 176 if (rc != 1)
icraggs 16:91c2f9a144d4 177 goto exit;
icraggs 16:91c2f9a144d4 178 *value += (c & 127) * multiplier;
icraggs 16:91c2f9a144d4 179 multiplier *= 128;
icraggs 16:91c2f9a144d4 180 } while ((c & 128) != 0);
icraggs 16:91c2f9a144d4 181 exit:
icraggs 16:91c2f9a144d4 182 return len;
icraggs 16:91c2f9a144d4 183 }
icraggs 16:91c2f9a144d4 184
icraggs 16:91c2f9a144d4 185
icraggs 16:91c2f9a144d4 186 /**
icraggs 16:91c2f9a144d4 187 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 16:91c2f9a144d4 188 * the packets can be retried.
icraggs 16:91c2f9a144d4 189 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 16:91c2f9a144d4 190 * @return the MQTT packet type, or -1 if none
icraggs 16:91c2f9a144d4 191 */
icraggs 16:91c2f9a144d4 192 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::readPacket(int timeout)
icraggs 16:91c2f9a144d4 193 {
icraggs 16:91c2f9a144d4 194 int rc = -1;
icraggs 16:91c2f9a144d4 195 MQTTHeader header = {0};
icraggs 16:91c2f9a144d4 196 int len = 0;
icraggs 16:91c2f9a144d4 197 int rem_len = 0;
icraggs 16:91c2f9a144d4 198
icraggs 16:91c2f9a144d4 199 /* 1. read the header byte. This has the packet type in it */
icraggs 16:91c2f9a144d4 200 if (ipstack->read(readbuf, 1, timeout) != 1)
icraggs 16:91c2f9a144d4 201 goto exit;
icraggs 16:91c2f9a144d4 202
icraggs 16:91c2f9a144d4 203 len = 1;
icraggs 16:91c2f9a144d4 204 /* 2. read the remaining length. This is variable in itself */
icraggs 16:91c2f9a144d4 205 decodePacket(&rem_len, timeout);
icraggs 16:91c2f9a144d4 206 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 16:91c2f9a144d4 207
icraggs 16:91c2f9a144d4 208 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 16:91c2f9a144d4 209 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
icraggs 16:91c2f9a144d4 210 goto exit;
icraggs 16:91c2f9a144d4 211
icraggs 16:91c2f9a144d4 212 header.byte = readbuf[0];
icraggs 16:91c2f9a144d4 213 rc = header.bits.type;
icraggs 16:91c2f9a144d4 214 exit:
icraggs 16:91c2f9a144d4 215 return rc;
icraggs 16:91c2f9a144d4 216 }
icraggs 16:91c2f9a144d4 217
icraggs 16:91c2f9a144d4 218
icraggs 16:91c2f9a144d4 219 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
icraggs 16:91c2f9a144d4 220 {
icraggs 16:91c2f9a144d4 221 // we have to find the right message handler - indexed by topic
icraggs 16:91c2f9a144d4 222 }
icraggs 16:91c2f9a144d4 223
icraggs 16:91c2f9a144d4 224
icraggs 16:91c2f9a144d4 225 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout)
icraggs 16:91c2f9a144d4 226 {
icraggs 16:91c2f9a144d4 227 /* get one piece of work off the wire and one pass through */
icraggs 16:91c2f9a144d4 228
icraggs 16:91c2f9a144d4 229 // read the socket, see what work is due
icraggs 16:91c2f9a144d4 230 int packet_type = readPacket(timeout);
icraggs 16:91c2f9a144d4 231
icraggs 16:91c2f9a144d4 232 int len, rc;
icraggs 16:91c2f9a144d4 233 switch (packet_type)
icraggs 16:91c2f9a144d4 234 {
icraggs 16:91c2f9a144d4 235 case CONNACK:
icraggs 16:91c2f9a144d4 236 case PUBACK:
icraggs 16:91c2f9a144d4 237 case SUBACK:
icraggs 16:91c2f9a144d4 238 break;
icraggs 16:91c2f9a144d4 239 case PUBLISH:
icraggs 16:91c2f9a144d4 240 MQTTString topicName;
icraggs 16:91c2f9a144d4 241 Message msg;
icraggs 16:91c2f9a144d4 242 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
icraggs 16:91c2f9a144d4 243 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
icraggs 16:91c2f9a144d4 244 if (msg.qos == QOS0)
icraggs 16:91c2f9a144d4 245 deliverMessage(&topicName, &msg);
icraggs 16:91c2f9a144d4 246 break;
icraggs 16:91c2f9a144d4 247 case PUBREC:
icraggs 16:91c2f9a144d4 248 int type, dup, mypacketid;
icraggs 16:91c2f9a144d4 249 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
icraggs 16:91c2f9a144d4 250 ;
icraggs 16:91c2f9a144d4 251 // must lock this access against the application thread, if we are multi-threaded
icraggs 16:91c2f9a144d4 252 len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
icraggs 16:91c2f9a144d4 253 rc = sendPacket(len); // send the subscribe packet
icraggs 16:91c2f9a144d4 254 if (rc != len)
icraggs 16:91c2f9a144d4 255 goto exit; // there was a problem
icraggs 16:91c2f9a144d4 256
icraggs 16:91c2f9a144d4 257 break;
icraggs 16:91c2f9a144d4 258 case PUBCOMP:
icraggs 16:91c2f9a144d4 259 break;
icraggs 16:91c2f9a144d4 260 case PINGRESP:
icraggs 16:91c2f9a144d4 261 ping_outstanding = false;
icraggs 16:91c2f9a144d4 262 break;
icraggs 16:91c2f9a144d4 263 }
icraggs 16:91c2f9a144d4 264 keepalive();
icraggs 16:91c2f9a144d4 265 exit:
icraggs 16:91c2f9a144d4 266 return packet_type;
icraggs 16:91c2f9a144d4 267 }
icraggs 16:91c2f9a144d4 268
icraggs 16:91c2f9a144d4 269
icraggs 16:91c2f9a144d4 270 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::keepalive()
icraggs 16:91c2f9a144d4 271 {
icraggs 16:91c2f9a144d4 272 int rc = 0;
icraggs 16:91c2f9a144d4 273
icraggs 16:91c2f9a144d4 274 if (keepAliveInterval == 0)
icraggs 16:91c2f9a144d4 275 goto exit;
icraggs 16:91c2f9a144d4 276
icraggs 16:91c2f9a144d4 277 if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
icraggs 16:91c2f9a144d4 278 {
icraggs 16:91c2f9a144d4 279 if (ping_outstanding)
icraggs 16:91c2f9a144d4 280 rc = -1;
icraggs 16:91c2f9a144d4 281 else
icraggs 16:91c2f9a144d4 282 {
icraggs 16:91c2f9a144d4 283 int len = MQTTSerialize_pingreq(buf, buflen);
icraggs 16:91c2f9a144d4 284 rc = sendPacket(len); // send the connect packet
icraggs 16:91c2f9a144d4 285 if (rc != len)
icraggs 16:91c2f9a144d4 286 rc = -1; // indicate there's a problem
icraggs 16:91c2f9a144d4 287 else
icraggs 16:91c2f9a144d4 288 ping_outstanding = true;
icraggs 16:91c2f9a144d4 289 }
icraggs 16:91c2f9a144d4 290 }
icraggs 16:91c2f9a144d4 291
icraggs 16:91c2f9a144d4 292 exit:
icraggs 16:91c2f9a144d4 293 return rc;
icraggs 16:91c2f9a144d4 294 }
icraggs 16:91c2f9a144d4 295
icraggs 16:91c2f9a144d4 296
icraggs 16:91c2f9a144d4 297 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
icraggs 16:91c2f9a144d4 298 {
icraggs 16:91c2f9a144d4 299 command_timer.start();
icraggs 16:91c2f9a144d4 300
icraggs 16:91c2f9a144d4 301 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 16:91c2f9a144d4 302 if (options == 0)
icraggs 16:91c2f9a144d4 303 options = &default_options; // set default options if none were supplied
icraggs 16:91c2f9a144d4 304
icraggs 16:91c2f9a144d4 305 this->keepAliveInterval = options->keepAliveInterval;
icraggs 16:91c2f9a144d4 306 ping_timer.start();
icraggs 16:91c2f9a144d4 307 int len = MQTTSerialize_connect(buf, buflen, options);
icraggs 16:91c2f9a144d4 308 int rc = sendPacket(len); // send the connect packet
icraggs 16:91c2f9a144d4 309 if (rc != len)
icraggs 16:91c2f9a144d4 310 goto exit; // there was a problem
icraggs 16:91c2f9a144d4 311
icraggs 16:91c2f9a144d4 312 // this will be a blocking call, wait for the connack
icraggs 16:91c2f9a144d4 313 do
icraggs 16:91c2f9a144d4 314 {
icraggs 16:91c2f9a144d4 315 if (command_timer.read_ms() > (command_timeout * 1000))
icraggs 16:91c2f9a144d4 316 goto exit; // we timed out
icraggs 16:91c2f9a144d4 317 }
icraggs 16:91c2f9a144d4 318 while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
icraggs 16:91c2f9a144d4 319 int connack_rc = -1;
icraggs 16:91c2f9a144d4 320 if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
icraggs 16:91c2f9a144d4 321 rc = connack_rc;
icraggs 16:91c2f9a144d4 322
icraggs 16:91c2f9a144d4 323 exit:
icraggs 16:91c2f9a144d4 324 command_timer.stop();
icraggs 16:91c2f9a144d4 325 command_timer.reset();
icraggs 16:91c2f9a144d4 326 return rc;
icraggs 16:91c2f9a144d4 327 }
icraggs 16:91c2f9a144d4 328
icraggs 16:91c2f9a144d4 329
icraggs 16:91c2f9a144d4 330 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
icraggs 16:91c2f9a144d4 331 {
icraggs 16:91c2f9a144d4 332 command_timer.start();
icraggs 16:91c2f9a144d4 333
icraggs 16:91c2f9a144d4 334 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 16:91c2f9a144d4 335
icraggs 16:91c2f9a144d4 336 int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
icraggs 16:91c2f9a144d4 337 int rc = sendPacket(len); // send the subscribe packet
icraggs 16:91c2f9a144d4 338 if (rc != len)
icraggs 16:91c2f9a144d4 339 goto exit; // there was a problem
icraggs 16:91c2f9a144d4 340
icraggs 16:91c2f9a144d4 341 // wait for suback - this will block
icraggs 16:91c2f9a144d4 342 if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
icraggs 16:91c2f9a144d4 343 {
icraggs 16:91c2f9a144d4 344 int count = 0, grantedQoS = -1, mypacketid;
icraggs 16:91c2f9a144d4 345 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
icraggs 16:91c2f9a144d4 346 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 16:91c2f9a144d4 347 if (rc != 0x80)
icraggs 16:91c2f9a144d4 348 }
icraggs 16:91c2f9a144d4 349
icraggs 16:91c2f9a144d4 350 exit:
icraggs 16:91c2f9a144d4 351 command_timer.stop();
icraggs 16:91c2f9a144d4 352 command_timer.reset();
icraggs 16:91c2f9a144d4 353 return rc;
icraggs 16:91c2f9a144d4 354 }
icraggs 16:91c2f9a144d4 355
icraggs 16:91c2f9a144d4 356
icraggs 16:91c2f9a144d4 357 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
icraggs 16:91c2f9a144d4 358 {
icraggs 16:91c2f9a144d4 359 command_timer.start();
icraggs 16:91c2f9a144d4 360
icraggs 16:91c2f9a144d4 361 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 16:91c2f9a144d4 362
icraggs 16:91c2f9a144d4 363 int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
icraggs 16:91c2f9a144d4 364 int rc = sendPacket(len); // send the subscribe packet
icraggs 16:91c2f9a144d4 365 if (rc != len)
icraggs 16:91c2f9a144d4 366 goto exit; // there was a problem
icraggs 16:91c2f9a144d4 367
icraggs 16:91c2f9a144d4 368 // this will block
icraggs 16:91c2f9a144d4 369 if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
icraggs 16:91c2f9a144d4 370 {
icraggs 16:91c2f9a144d4 371 int mypacketid;
icraggs 16:91c2f9a144d4 372 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
icraggs 16:91c2f9a144d4 373 rc = 0;
icraggs 16:91c2f9a144d4 374 }
icraggs 16:91c2f9a144d4 375
icraggs 16:91c2f9a144d4 376 exit:
icraggs 16:91c2f9a144d4 377 command_timer.stop();
icraggs 16:91c2f9a144d4 378 command_timer.reset();
icraggs 16:91c2f9a144d4 379 return rc;
icraggs 16:91c2f9a144d4 380 }
icraggs 16:91c2f9a144d4 381
icraggs 16:91c2f9a144d4 382
icraggs 16:91c2f9a144d4 383
icraggs 16:91c2f9a144d4 384 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler)
icraggs 16:91c2f9a144d4 385 {
icraggs 16:91c2f9a144d4 386 command_timer.start();
icraggs 16:91c2f9a144d4 387
icraggs 16:91c2f9a144d4 388 MQTTString topic = {(char*)topicName, 0, 0};
icraggs 16:91c2f9a144d4 389
icraggs 16:91c2f9a144d4 390 message->id = packetid.getNext();
icraggs 16:91c2f9a144d4 391
icraggs 16:91c2f9a144d4 392 int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
icraggs 16:91c2f9a144d4 393 int rc = sendPacket(len); // send the subscribe packet
icraggs 16:91c2f9a144d4 394 if (rc != len)
icraggs 16:91c2f9a144d4 395 goto exit; // there was a problem
icraggs 16:91c2f9a144d4 396
icraggs 16:91c2f9a144d4 397 /* wait for acks */
icraggs 16:91c2f9a144d4 398 if (resultHandler == 0)
icraggs 16:91c2f9a144d4 399 {
icraggs 16:91c2f9a144d4 400 if (message->qos == QOS1)
icraggs 16:91c2f9a144d4 401 {
icraggs 16:91c2f9a144d4 402 if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
icraggs 16:91c2f9a144d4 403 {
icraggs 16:91c2f9a144d4 404 int type, dup, mypacketid;
icraggs 16:91c2f9a144d4 405 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
icraggs 16:91c2f9a144d4 406 rc = 0;
icraggs 16:91c2f9a144d4 407 }
icraggs 16:91c2f9a144d4 408 }
icraggs 16:91c2f9a144d4 409 else if (message->qos == QOS2)
icraggs 16:91c2f9a144d4 410 {
icraggs 16:91c2f9a144d4 411 if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
icraggs 16:91c2f9a144d4 412 {
icraggs 16:91c2f9a144d4 413 int type, dup, mypacketid;
icraggs 16:91c2f9a144d4 414 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
icraggs 16:91c2f9a144d4 415 rc = 0;
icraggs 16:91c2f9a144d4 416 }
icraggs 16:91c2f9a144d4 417
icraggs 16:91c2f9a144d4 418 }
icraggs 16:91c2f9a144d4 419 }
icraggs 16:91c2f9a144d4 420
icraggs 16:91c2f9a144d4 421 exit:
icraggs 16:91c2f9a144d4 422 command_timer.stop();
icraggs 16:91c2f9a144d4 423 command_timer.reset();
icraggs 16:91c2f9a144d4 424 return rc;
icraggs 16:91c2f9a144d4 425 }
icraggs 16:91c2f9a144d4 426
icraggs 16:91c2f9a144d4 427
icraggs 16:91c2f9a144d4 428 #endif