M2X MQTT Client for ARM MBED

Dependents:   WNCInterface_M2XMQTTdemo

Committer:
citrusbyte
Date:
Thu Sep 15 10:17:00 2016 +0000
Revision:
1:c4d41ff3c58e
Parent:
0:5a4798128c36
Update library to latest version

Who changed what in which revision?

UserRevisionLine numberNew contents of line
citrusbyte 0:5a4798128c36 1 #ifndef M2XMQTTCLIENT_H_
citrusbyte 0:5a4798128c36 2 #define M2XMQTTCLIENT_H_
citrusbyte 0:5a4798128c36 3
citrusbyte 0:5a4798128c36 4 #if (!defined(MBED_PLATFORM)) && (!defined(LINUX_PLATFORM))
citrusbyte 0:5a4798128c36 5 #error "Platform definition is missing!"
citrusbyte 0:5a4798128c36 6 #endif
citrusbyte 0:5a4798128c36 7
citrusbyte 0:5a4798128c36 8 #define M2X_VERSION "0.1.0"
citrusbyte 0:5a4798128c36 9
citrusbyte 0:5a4798128c36 10 #ifdef MBED_PLATFORM
citrusbyte 0:5a4798128c36 11 #include "m2x-mbed.h"
citrusbyte 0:5a4798128c36 12 #endif /* MBED_PLATFORM */
citrusbyte 0:5a4798128c36 13
citrusbyte 0:5a4798128c36 14 #ifdef LINUX_PLATFORM
citrusbyte 0:5a4798128c36 15 #include "m2x-linux.h"
citrusbyte 0:5a4798128c36 16 #endif /* LINUX_PLATFORM */
citrusbyte 0:5a4798128c36 17
citrusbyte 0:5a4798128c36 18 /* If we don't have DBG defined, provide dump implementation */
citrusbyte 0:5a4798128c36 19 #ifndef DBG
citrusbyte 0:5a4798128c36 20 #define DBG(fmt_, data_)
citrusbyte 0:5a4798128c36 21 #define DBGLN(fmt_, data_)
citrusbyte 0:5a4798128c36 22 #define DBGLNEND
citrusbyte 0:5a4798128c36 23 #endif /* DBG */
citrusbyte 0:5a4798128c36 24
citrusbyte 0:5a4798128c36 25 #define MIN(a, b) (((a) > (b))?(b):(a))
citrusbyte 0:5a4798128c36 26 #define TO_HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
citrusbyte 0:5a4798128c36 27 #define MAX_DOUBLE_DIGITS 7
citrusbyte 0:5a4798128c36 28
citrusbyte 0:5a4798128c36 29 /* For tolower */
citrusbyte 0:5a4798128c36 30 #include <ctype.h>
citrusbyte 0:5a4798128c36 31
citrusbyte 0:5a4798128c36 32 static const int E_OK = 0;
citrusbyte 0:5a4798128c36 33 static const int E_NOCONNECTION = -1;
citrusbyte 0:5a4798128c36 34 static const int E_DISCONNECTED = -2;
citrusbyte 0:5a4798128c36 35 static const int E_NOTREACHABLE = -3;
citrusbyte 0:5a4798128c36 36 static const int E_INVALID = -4;
citrusbyte 0:5a4798128c36 37 static const int E_JSON_INVALID = -5;
citrusbyte 0:5a4798128c36 38 static const int E_BUFFER_TOO_SMALL = -6;
citrusbyte 0:5a4798128c36 39 static const int E_TIMESTAMP_ERROR = -8;
citrusbyte 0:5a4798128c36 40
citrusbyte 0:5a4798128c36 41 static const char* DEFAULT_M2X_HOST = "api-m2x.att.com";
citrusbyte 0:5a4798128c36 42 static const int DEFAULT_M2X_PORT = 1883;
citrusbyte 0:5a4798128c36 43
citrusbyte 0:5a4798128c36 44 static inline bool m2x_status_is_success(int status) {
citrusbyte 0:5a4798128c36 45 return (status == E_OK) || (status >= 200 && status <= 299);
citrusbyte 0:5a4798128c36 46 }
citrusbyte 0:5a4798128c36 47
citrusbyte 0:5a4798128c36 48 static inline bool m2x_status_is_client_error(int status) {
citrusbyte 0:5a4798128c36 49 return status >= 400 && status <= 499;
citrusbyte 0:5a4798128c36 50 }
citrusbyte 0:5a4798128c36 51
citrusbyte 0:5a4798128c36 52 static inline bool m2x_status_is_server_error(int status) {
citrusbyte 0:5a4798128c36 53 return status >= 500 && status <= 599;
citrusbyte 0:5a4798128c36 54 }
citrusbyte 0:5a4798128c36 55
citrusbyte 0:5a4798128c36 56 static inline bool m2x_status_is_error(int status) {
citrusbyte 0:5a4798128c36 57 return m2x_status_is_client_error(status) ||
citrusbyte 0:5a4798128c36 58 m2x_status_is_server_error(status);
citrusbyte 0:5a4798128c36 59 }
citrusbyte 0:5a4798128c36 60
citrusbyte 0:5a4798128c36 61 // Null Print class used to calculate length to print
citrusbyte 0:5a4798128c36 62 class NullPrint : public Print {
citrusbyte 0:5a4798128c36 63 public:
citrusbyte 0:5a4798128c36 64 size_t counter;
citrusbyte 0:5a4798128c36 65
citrusbyte 0:5a4798128c36 66 virtual size_t write(uint8_t b) {
citrusbyte 0:5a4798128c36 67 counter++;
citrusbyte 0:5a4798128c36 68 return 1;
citrusbyte 0:5a4798128c36 69 }
citrusbyte 0:5a4798128c36 70
citrusbyte 0:5a4798128c36 71 virtual size_t write(const uint8_t* buf, size_t size) {
citrusbyte 0:5a4798128c36 72 counter += size;
citrusbyte 0:5a4798128c36 73 return size;
citrusbyte 0:5a4798128c36 74 }
citrusbyte 0:5a4798128c36 75 };
citrusbyte 0:5a4798128c36 76
citrusbyte 0:5a4798128c36 77 // Handy helper class for printing MQTT payload using a Print
citrusbyte 0:5a4798128c36 78 class MMQTTPrint : public Print {
citrusbyte 0:5a4798128c36 79 public:
citrusbyte 0:5a4798128c36 80 mmqtt_connection *connection;
citrusbyte 0:5a4798128c36 81 mmqtt_s_puller puller;
citrusbyte 0:5a4798128c36 82
citrusbyte 0:5a4798128c36 83 virtual size_t write(uint8_t b) {
citrusbyte 0:5a4798128c36 84 return write(&b, 1);
citrusbyte 0:5a4798128c36 85 }
citrusbyte 0:5a4798128c36 86
citrusbyte 0:5a4798128c36 87 virtual size_t write(const uint8_t* buf, size_t size) {
citrusbyte 0:5a4798128c36 88 return mmqtt_s_encode_buffer(connection, puller, buf, size) == MMQTT_STATUS_OK ? size : -1;
citrusbyte 0:5a4798128c36 89 }
citrusbyte 0:5a4798128c36 90 };
citrusbyte 0:5a4798128c36 91
citrusbyte 0:5a4798128c36 92 class M2XMQTTClient {
citrusbyte 0:5a4798128c36 93 public:
citrusbyte 0:5a4798128c36 94 M2XMQTTClient(Client* client,
citrusbyte 0:5a4798128c36 95 const char* key,
citrusbyte 0:5a4798128c36 96 void (* idlefunc)(void) = NULL,
citrusbyte 0:5a4798128c36 97 bool keepalive = true,
citrusbyte 0:5a4798128c36 98 const char* host = DEFAULT_M2X_HOST,
citrusbyte 0:5a4798128c36 99 int port = DEFAULT_M2X_PORT,
citrusbyte 0:5a4798128c36 100 const char* path_prefix = NULL);
citrusbyte 0:5a4798128c36 101
citrusbyte 0:5a4798128c36 102 // Push data stream value using PUT request, returns the HTTP status code
citrusbyte 0:5a4798128c36 103 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 104 // the device ID here.
citrusbyte 0:5a4798128c36 105 template <class T>
citrusbyte 0:5a4798128c36 106 int updateStreamValue(const char* deviceId, const char* streamName, T value);
citrusbyte 0:5a4798128c36 107
citrusbyte 0:5a4798128c36 108 // Post multiple values to M2X all at once.
citrusbyte 0:5a4798128c36 109 // +deviceId+ - id of the device to post values
citrusbyte 0:5a4798128c36 110 // +streamNum+ - Number of streams to post
citrusbyte 0:5a4798128c36 111 // +names+ - Array of stream names, the length of the array should
citrusbyte 0:5a4798128c36 112 // be exactly +streamNum+
citrusbyte 0:5a4798128c36 113 // +counts+ - Array of +streamNum+ length, each item in this array
citrusbyte 0:5a4798128c36 114 // containing the number of values we want to post for each stream
citrusbyte 0:5a4798128c36 115 // +ats+ - Timestamps for each value, the length of this array should
citrusbyte 0:5a4798128c36 116 // be the some of all values in +counts+, for the first +counts[0]+
citrusbyte 0:5a4798128c36 117 // items, the values belong to the first stream, for the following
citrusbyte 0:5a4798128c36 118 // +counts[1]+ number of items, the values belong to the second stream,
citrusbyte 0:5a4798128c36 119 // etc. Notice that timestamps are required here: you must provide
citrusbyte 0:5a4798128c36 120 // a timestamp for each value posted.
citrusbyte 0:5a4798128c36 121 // +values+ - Values to post. This works the same way as +ats+, the
citrusbyte 0:5a4798128c36 122 // first +counts[0]+ number of items contain values to post to the first
citrusbyte 0:5a4798128c36 123 // stream, the succeeding +counts[1]+ number of items contain values
citrusbyte 0:5a4798128c36 124 // for the second stream, etc. The length of this array should be
citrusbyte 0:5a4798128c36 125 // the sum of all values in +counts+ array.
citrusbyte 0:5a4798128c36 126 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 127 // the device ID here.
citrusbyte 0:5a4798128c36 128 template <class T>
citrusbyte 0:5a4798128c36 129 int postDeviceUpdates(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 130 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 131 const char* ats[], T values[]);
citrusbyte 0:5a4798128c36 132
citrusbyte 0:5a4798128c36 133 // Post multiple values of a single device at once.
citrusbyte 0:5a4798128c36 134 // +deviceId+ - id of the device to post values
citrusbyte 0:5a4798128c36 135 // +streamNum+ - Number of streams to post
citrusbyte 0:5a4798128c36 136 // +names+ - Array of stream names, the length of the array should
citrusbyte 0:5a4798128c36 137 // be exactly +streamNum+
citrusbyte 0:5a4798128c36 138 // +values+ - Array of values to post, the length of the array should
citrusbyte 0:5a4798128c36 139 // be exactly +streamNum+. Notice that the array of +values+ should
citrusbyte 0:5a4798128c36 140 // match the array of +names+, and that the ith value in +values+ is
citrusbyte 0:5a4798128c36 141 // exactly the value to post for the ith stream name in +names+
citrusbyte 0:5a4798128c36 142 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 143 // the device ID here.
citrusbyte 0:5a4798128c36 144 template <class T>
citrusbyte 0:5a4798128c36 145 int postDeviceUpdate(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 146 const char* names[], T values[],
citrusbyte 0:5a4798128c36 147 const char* at = NULL);
citrusbyte 0:5a4798128c36 148
citrusbyte 0:5a4798128c36 149 // Update datasource location
citrusbyte 0:5a4798128c36 150 // NOTE: On an Arduino Uno and other ATMEGA based boards, double has
citrusbyte 0:5a4798128c36 151 // 4-byte (32 bits) precision, which is the same as float. So there's
citrusbyte 0:5a4798128c36 152 // no natural double-precision floating number on these boards. With
citrusbyte 0:5a4798128c36 153 // a float value, we have a precision of roughly 7 digits, that means
citrusbyte 0:5a4798128c36 154 // either 5 or 6 digits after the floating point. According to wikipedia,
citrusbyte 0:5a4798128c36 155 // a difference of 0.00001 will give us ~1.1132m distance. If this
citrusbyte 0:5a4798128c36 156 // precision is good for you, you can use the double-version we provided
citrusbyte 0:5a4798128c36 157 // here. Otherwise, you may need to use the string-version and do the
citrusbyte 0:5a4798128c36 158 // actual conversion by yourselves.
citrusbyte 0:5a4798128c36 159 // However, with an Arduino Due board, double has 8-bytes (64 bits)
citrusbyte 0:5a4798128c36 160 // precision, which means you are free to use the double-version only
citrusbyte 0:5a4798128c36 161 // without any precision problems.
citrusbyte 0:5a4798128c36 162 // Returned value is the http status code.
citrusbyte 0:5a4798128c36 163 // NOTE: if you want to update by a serial, use "serial/<serial ID>" as
citrusbyte 0:5a4798128c36 164 // the device ID here.
citrusbyte 0:5a4798128c36 165 template <class T>
citrusbyte 0:5a4798128c36 166 int updateLocation(const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 167 T latitude, T longitude, T elevation);
citrusbyte 0:5a4798128c36 168
citrusbyte 1:c4d41ff3c58e 169 // Delete values from a data stream
citrusbyte 1:c4d41ff3c58e 170 // You will need to provide from and end date/time strings in the ISO8601
citrusbyte 1:c4d41ff3c58e 171 // format "yyyy-mm-ddTHH:MM:SS.SSSZ" where
citrusbyte 1:c4d41ff3c58e 172 // yyyy: the year
citrusbyte 1:c4d41ff3c58e 173 // mm: the month
citrusbyte 1:c4d41ff3c58e 174 // dd: the day
citrusbyte 1:c4d41ff3c58e 175 // HH: the hour (24 hour format)
citrusbyte 1:c4d41ff3c58e 176 // MM: the minute
citrusbyte 1:c4d41ff3c58e 177 // SS.SSS: the seconds (to the millisecond)
citrusbyte 1:c4d41ff3c58e 178 // NOTE: the time is given in Zulu (GMT)
citrusbyte 1:c4d41ff3c58e 179 // M2X will delete all values within the from to end date/time range.
citrusbyte 1:c4d41ff3c58e 180 // The status code is 204 on success and 400 on a bad request (e.g. the
citrusbyte 1:c4d41ff3c58e 181 // timestamp is not in ISO8601 format or the from timestamp is not less than
citrusbyte 1:c4d41ff3c58e 182 // or equal to the end timestamp.
citrusbyte 1:c4d41ff3c58e 183 int deleteValues(const char* deviceId, const char* streamName,
citrusbyte 1:c4d41ff3c58e 184 const char* from, const char* end);
citrusbyte 1:c4d41ff3c58e 185
citrusbyte 0:5a4798128c36 186 // Following fields are public so mmqtt callback functions can access directly
citrusbyte 0:5a4798128c36 187 Client* _client;
citrusbyte 0:5a4798128c36 188 private:
citrusbyte 0:5a4798128c36 189 struct mmqtt_connection _connection;
citrusbyte 0:5a4798128c36 190 const char* _key;
citrusbyte 0:5a4798128c36 191 uint16_t _key_length;
citrusbyte 0:5a4798128c36 192 bool _connected;
citrusbyte 0:5a4798128c36 193 bool _keepalive;
citrusbyte 0:5a4798128c36 194 const char* _host;
citrusbyte 0:5a4798128c36 195 int _port;
citrusbyte 0:5a4798128c36 196 void (* _idlefunc)(void);
citrusbyte 0:5a4798128c36 197 const char* _path_prefix;
citrusbyte 0:5a4798128c36 198 NullPrint _null_print;
citrusbyte 0:5a4798128c36 199 MMQTTPrint _mmqtt_print;
citrusbyte 0:5a4798128c36 200 int16_t _current_id;
citrusbyte 0:5a4798128c36 201
citrusbyte 0:5a4798128c36 202 int connectToServer();
citrusbyte 0:5a4798128c36 203
citrusbyte 0:5a4798128c36 204 template <class T>
citrusbyte 0:5a4798128c36 205 int printUpdateStreamValuePayload(Print* print, const char* deviceId,
citrusbyte 0:5a4798128c36 206 const char* streamName, T value);
citrusbyte 0:5a4798128c36 207
citrusbyte 0:5a4798128c36 208 template <class T>
citrusbyte 0:5a4798128c36 209 int printPostDeviceUpdatesPayload(Print* print,
citrusbyte 0:5a4798128c36 210 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 211 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 212 const char* ats[], T values[]);
citrusbyte 0:5a4798128c36 213
citrusbyte 0:5a4798128c36 214 template <class T>
citrusbyte 0:5a4798128c36 215 int printPostDeviceUpdatePayload(Print* print,
citrusbyte 0:5a4798128c36 216 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 217 const char* names[], T values[],
citrusbyte 0:5a4798128c36 218 const char* at = NULL);
citrusbyte 0:5a4798128c36 219
citrusbyte 0:5a4798128c36 220 template <class T>
citrusbyte 0:5a4798128c36 221 int printUpdateLocationPayload(Print* print,
citrusbyte 0:5a4798128c36 222 const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 223 T latitude, T longitude, T elevation);
citrusbyte 0:5a4798128c36 224
citrusbyte 1:c4d41ff3c58e 225 int printDeleteValuesPayload(Print* print,
citrusbyte 1:c4d41ff3c58e 226 const char* deviceId, const char* streamName,
citrusbyte 1:c4d41ff3c58e 227 const char* from, const char* end);
citrusbyte 1:c4d41ff3c58e 228
citrusbyte 0:5a4798128c36 229 int readStatusCode();
citrusbyte 0:5a4798128c36 230 void close();
citrusbyte 0:5a4798128c36 231 };
citrusbyte 0:5a4798128c36 232
citrusbyte 0:5a4798128c36 233 // Implementations
citrusbyte 0:5a4798128c36 234 M2XMQTTClient::M2XMQTTClient(Client* client,
citrusbyte 0:5a4798128c36 235 const char* key,
citrusbyte 0:5a4798128c36 236 void (* idlefunc)(void),
citrusbyte 0:5a4798128c36 237 bool keepalive,
citrusbyte 0:5a4798128c36 238 const char* host,
citrusbyte 0:5a4798128c36 239 int port,
citrusbyte 0:5a4798128c36 240 const char* path_prefix) : _client(client),
citrusbyte 0:5a4798128c36 241 _key(key),
citrusbyte 0:5a4798128c36 242 _idlefunc(idlefunc),
citrusbyte 0:5a4798128c36 243 _keepalive(keepalive),
citrusbyte 0:5a4798128c36 244 _connected(false),
citrusbyte 0:5a4798128c36 245 _host(host),
citrusbyte 0:5a4798128c36 246 _port(port),
citrusbyte 0:5a4798128c36 247 _path_prefix(path_prefix),
citrusbyte 0:5a4798128c36 248 _null_print(),
citrusbyte 0:5a4798128c36 249 _mmqtt_print(),
citrusbyte 0:5a4798128c36 250 _current_id(0) {
citrusbyte 0:5a4798128c36 251 _key_length = strlen(_key);
citrusbyte 0:5a4798128c36 252 }
citrusbyte 0:5a4798128c36 253
citrusbyte 0:5a4798128c36 254 mmqtt_status_t m2x_mmqtt_puller(struct mmqtt_connection *connection) {
citrusbyte 0:5a4798128c36 255 const uint8_t *data = NULL;
citrusbyte 0:5a4798128c36 256 mmqtt_ssize_t length = 0;
citrusbyte 0:5a4798128c36 257 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 258 M2XMQTTClient *client = (M2XMQTTClient *) connection->connection;
citrusbyte 0:5a4798128c36 259 Client *c = client->_client;
citrusbyte 0:5a4798128c36 260 struct mmqtt_stream *stream = mmqtt_connection_pullable_stream(connection);
citrusbyte 0:5a4798128c36 261 if (stream == NULL) { return MMQTT_STATUS_NOT_PULLABLE; }
citrusbyte 0:5a4798128c36 262
citrusbyte 0:5a4798128c36 263 status = mmqtt_stream_external_pullable(stream, &data, &length);
citrusbyte 0:5a4798128c36 264 if (status != MMQTT_STATUS_OK) { return status; }
citrusbyte 0:5a4798128c36 265
citrusbyte 0:5a4798128c36 266 length = c->write(data, length);
citrusbyte 0:5a4798128c36 267 if (length < 0) {
citrusbyte 0:5a4798128c36 268 c->stop();
citrusbyte 0:5a4798128c36 269 return MMQTT_STATUS_BROKEN_CONNECTION;
citrusbyte 0:5a4798128c36 270 }
citrusbyte 0:5a4798128c36 271 mmqtt_stream_external_pull(stream, length);
citrusbyte 0:5a4798128c36 272 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
citrusbyte 0:5a4798128c36 273 mmqtt_connection_release_write_stream(connection, stream);
citrusbyte 0:5a4798128c36 274 }
citrusbyte 0:5a4798128c36 275 return MMQTT_STATUS_OK;
citrusbyte 0:5a4798128c36 276 }
citrusbyte 0:5a4798128c36 277
citrusbyte 0:5a4798128c36 278 mmqtt_status_t m2x_mmqtt_pusher(struct mmqtt_connection *connection, mmqtt_ssize_t max_size) {
citrusbyte 0:5a4798128c36 279 uint8_t *data = NULL;
citrusbyte 0:5a4798128c36 280 mmqtt_ssize_t length = 0, i = 0;
citrusbyte 0:5a4798128c36 281 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 282 M2XMQTTClient *client = (M2XMQTTClient *) connection->connection;
citrusbyte 0:5a4798128c36 283 Client *c = client->_client;
citrusbyte 0:5a4798128c36 284 struct mmqtt_stream *stream = mmqtt_connection_pushable_stream(connection);
citrusbyte 0:5a4798128c36 285 if (stream == NULL) { return MMQTT_STATUS_NOT_PUSHABLE; }
citrusbyte 0:5a4798128c36 286
citrusbyte 0:5a4798128c36 287 status = mmqtt_stream_external_pushable(stream, &data, &length);
citrusbyte 0:5a4798128c36 288 if (status != MMQTT_STATUS_OK) { return status; }
citrusbyte 0:5a4798128c36 289 length = min(length, max_size);
citrusbyte 0:5a4798128c36 290
citrusbyte 0:5a4798128c36 291 /* Maybe we need another field in signature documenting how much data we want,
citrusbyte 0:5a4798128c36 292 * so we can handle end condition gracefully? Not 100% if `left` field in
citrusbyte 0:5a4798128c36 293 * mmqtt_stream is enough
citrusbyte 0:5a4798128c36 294 */
citrusbyte 0:5a4798128c36 295 while (i < length && c->available()) {
citrusbyte 0:5a4798128c36 296 data[i++] = c->read();
citrusbyte 0:5a4798128c36 297 }
citrusbyte 0:5a4798128c36 298 mmqtt_stream_external_push(stream, i);
citrusbyte 0:5a4798128c36 299 return MMQTT_STATUS_OK;
citrusbyte 0:5a4798128c36 300 }
citrusbyte 0:5a4798128c36 301
citrusbyte 0:5a4798128c36 302 size_t m2x_mjson_reader(struct mjson_ctx *ctx, char *data, size_t limit)
citrusbyte 0:5a4798128c36 303 {
citrusbyte 0:5a4798128c36 304 mmqtt_connection *connection = (mmqtt_connection *) ctx->userdata;
citrusbyte 0:5a4798128c36 305 return mmqtt_s_decode_buffer(connection, m2x_mmqtt_pusher, (uint8_t *) data, limit,
citrusbyte 0:5a4798128c36 306 limit) == MMQTT_STATUS_OK ? limit : 0;
citrusbyte 0:5a4798128c36 307 }
citrusbyte 0:5a4798128c36 308
citrusbyte 0:5a4798128c36 309 int M2XMQTTClient::connectToServer() {
citrusbyte 0:5a4798128c36 310 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 311 struct mmqtt_p_connect_header connect_header;
citrusbyte 0:5a4798128c36 312 struct mmqtt_p_connack_header connack_header;
citrusbyte 0:5a4798128c36 313 uint32_t packet_length;
citrusbyte 0:5a4798128c36 314 uint8_t flag;
citrusbyte 0:5a4798128c36 315 uint16_t length;
citrusbyte 0:5a4798128c36 316 uint8_t name[6];
citrusbyte 0:5a4798128c36 317
citrusbyte 0:5a4798128c36 318 if (_client->connect(_host, _port)) {
citrusbyte 0:5a4798128c36 319 DBGLN("%s", F("Connected to M2X MQTT server!"));
citrusbyte 0:5a4798128c36 320 mmqtt_connection_init(&_connection, this);
citrusbyte 0:5a4798128c36 321 /* Send CONNECT packet first */
citrusbyte 0:5a4798128c36 322 connect_header.name = name;
citrusbyte 0:5a4798128c36 323 strncpy((char *)name, F("MQIsdp"), 6);
citrusbyte 0:5a4798128c36 324 connect_header.name_length = connect_header.name_max_length = 6;
citrusbyte 0:5a4798128c36 325 connect_header.protocol_version = 3;
citrusbyte 0:5a4798128c36 326 /* Clean session with username set */
citrusbyte 0:5a4798128c36 327 connect_header.flags = 0x82;
citrusbyte 0:5a4798128c36 328 connect_header.keepalive = 60;
citrusbyte 0:5a4798128c36 329 packet_length = mmqtt_s_connect_header_encoded_length(&connect_header) +
citrusbyte 1:c4d41ff3c58e 330 mmqtt_s_string_encoded_length(_key_length) +
citrusbyte 0:5a4798128c36 331 mmqtt_s_string_encoded_length(_key_length);
citrusbyte 0:5a4798128c36 332 status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 333 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_CONNECT),
citrusbyte 0:5a4798128c36 334 packet_length);
citrusbyte 0:5a4798128c36 335 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 336 DBG("%s", F("Error sending connect packet fixed header: "));
citrusbyte 0:5a4798128c36 337 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 338 _client->stop();
citrusbyte 0:5a4798128c36 339 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 340 }
citrusbyte 0:5a4798128c36 341 status = mmqtt_s_encode_connect_header(&_connection, m2x_mmqtt_puller, &connect_header);
citrusbyte 0:5a4798128c36 342 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 343 DBG("%s", F("Error sending connect packet variable header: "));
citrusbyte 0:5a4798128c36 344 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 345 _client->stop();
citrusbyte 0:5a4798128c36 346 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 347 }
citrusbyte 1:c4d41ff3c58e 348 /* Client ID */
citrusbyte 1:c4d41ff3c58e 349 status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
citrusbyte 1:c4d41ff3c58e 350 (const uint8_t *) _key, _key_length);
citrusbyte 1:c4d41ff3c58e 351 if (status != MMQTT_STATUS_OK) {
citrusbyte 1:c4d41ff3c58e 352 DBG("%s", F("Error sending connect packet payload: "));
citrusbyte 1:c4d41ff3c58e 353 DBGLN("%d", status);
citrusbyte 1:c4d41ff3c58e 354 _client->stop();
citrusbyte 1:c4d41ff3c58e 355 return E_DISCONNECTED;
citrusbyte 1:c4d41ff3c58e 356 }
citrusbyte 1:c4d41ff3c58e 357 /* Username */
citrusbyte 0:5a4798128c36 358 status = mmqtt_s_encode_string(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 359 (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 360 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 361 DBG("%s", F("Error sending connect packet payload: "));
citrusbyte 0:5a4798128c36 362 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 363 _client->stop();
citrusbyte 0:5a4798128c36 364 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 365 }
citrusbyte 0:5a4798128c36 366 /* Check CONNACK packet */
citrusbyte 0:5a4798128c36 367 do {
citrusbyte 0:5a4798128c36 368 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 369 &flag, &packet_length);
citrusbyte 0:5a4798128c36 370 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 371 DBG("%s", F("Error decoding connack fixed header: "));
citrusbyte 0:5a4798128c36 372 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 373 _client->stop();
citrusbyte 0:5a4798128c36 374 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 375 }
citrusbyte 0:5a4798128c36 376 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK) {
citrusbyte 0:5a4798128c36 377 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 378 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 379 DBG("%s", F("Error skipping non-connack packet: "));
citrusbyte 0:5a4798128c36 380 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 381 _client->stop();
citrusbyte 0:5a4798128c36 382 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 383 }
citrusbyte 0:5a4798128c36 384 }
citrusbyte 0:5a4798128c36 385 } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_CONNACK);
citrusbyte 0:5a4798128c36 386 status = mmqtt_s_decode_connack_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 387 &connack_header);
citrusbyte 0:5a4798128c36 388 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 389 DBG("%s", F("Error decoding connack variable header: "));
citrusbyte 0:5a4798128c36 390 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 391 _client->stop();
citrusbyte 0:5a4798128c36 392 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 393 }
citrusbyte 0:5a4798128c36 394 if (connack_header.return_code != 0x0) {
citrusbyte 0:5a4798128c36 395 DBG("%s", F("CONNACK return code is not accepted: "));
citrusbyte 0:5a4798128c36 396 DBGLN("%d", connack_header.return_code);
citrusbyte 0:5a4798128c36 397 _client->stop();
citrusbyte 0:5a4798128c36 398 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 399 }
citrusbyte 0:5a4798128c36 400 /* Send SUBSCRIBE packet*/
citrusbyte 0:5a4798128c36 401 length = _key_length + 15 + 4;
citrusbyte 0:5a4798128c36 402 status = mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 403 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_SUBSCRIBE) | 0x2,
citrusbyte 0:5a4798128c36 404 length);
citrusbyte 0:5a4798128c36 405 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 406 DBG("%s", F("Error sending subscribe packet fixed header: "));
citrusbyte 0:5a4798128c36 407 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 408 _client->stop();
citrusbyte 0:5a4798128c36 409 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 410 }
citrusbyte 0:5a4798128c36 411 // Subscribe packet must use QoS 1
citrusbyte 0:5a4798128c36 412 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, 0);
citrusbyte 0:5a4798128c36 413 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 14);
citrusbyte 0:5a4798128c36 414 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 415 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 416 // The extra one is QoS, added here to save a function call
citrusbyte 0:5a4798128c36 417 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/responses\0"), 11);
citrusbyte 0:5a4798128c36 418 /* Check SUBACK packet */
citrusbyte 0:5a4798128c36 419 do {
citrusbyte 0:5a4798128c36 420 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 421 &flag, &packet_length);
citrusbyte 0:5a4798128c36 422 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 423 DBG("%s", F("Error decoding suback fixed header: "));
citrusbyte 0:5a4798128c36 424 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 425 _client->stop();
citrusbyte 0:5a4798128c36 426 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 427 }
citrusbyte 0:5a4798128c36 428 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK) {
citrusbyte 0:5a4798128c36 429 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 430 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 431 DBG("%s", F("Error skipping packet: "));
citrusbyte 0:5a4798128c36 432 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 433 _client->stop();
citrusbyte 0:5a4798128c36 434 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 435 }
citrusbyte 0:5a4798128c36 436 }
citrusbyte 0:5a4798128c36 437 } while (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_SUBACK);
citrusbyte 0:5a4798128c36 438 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 439 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 440 DBG("%s", F("Error skipping suback packet: "));
citrusbyte 0:5a4798128c36 441 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 442 _client->stop();
citrusbyte 0:5a4798128c36 443 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 444 }
citrusbyte 0:5a4798128c36 445 _mmqtt_print.connection = &_connection;
citrusbyte 0:5a4798128c36 446 _mmqtt_print.puller = m2x_mmqtt_puller;
citrusbyte 0:5a4798128c36 447 _connected = true;
citrusbyte 0:5a4798128c36 448 return E_OK;
citrusbyte 0:5a4798128c36 449 } else {
citrusbyte 0:5a4798128c36 450 DBGLN("%s", F("ERROR: Cannot connect to M2X MQTT server!"));
citrusbyte 0:5a4798128c36 451 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 452 }
citrusbyte 0:5a4798128c36 453 }
citrusbyte 0:5a4798128c36 454
citrusbyte 0:5a4798128c36 455 template <class T>
citrusbyte 0:5a4798128c36 456 int M2XMQTTClient::updateStreamValue(const char* deviceId, const char* streamName, T value) {
citrusbyte 0:5a4798128c36 457 int length;
citrusbyte 0:5a4798128c36 458 if (!_connected) {
citrusbyte 0:5a4798128c36 459 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 460 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 461 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 462 }
citrusbyte 0:5a4798128c36 463 }
citrusbyte 0:5a4798128c36 464 _current_id++;
citrusbyte 0:5a4798128c36 465 length = printUpdateStreamValuePayload(&_null_print, deviceId, streamName, value);
citrusbyte 0:5a4798128c36 466 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 467 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 468 length + _key_length + 15);
citrusbyte 0:5a4798128c36 469 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 470 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 471 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 472 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 473 printUpdateStreamValuePayload(&_mmqtt_print, deviceId, streamName, value);
citrusbyte 0:5a4798128c36 474 return readStatusCode();
citrusbyte 0:5a4798128c36 475 }
citrusbyte 0:5a4798128c36 476
citrusbyte 0:5a4798128c36 477 template <class T>
citrusbyte 0:5a4798128c36 478 int M2XMQTTClient::printUpdateStreamValuePayload(Print* print, const char* deviceId,
citrusbyte 0:5a4798128c36 479 const char* streamName, T value) {
citrusbyte 0:5a4798128c36 480 int bytes = 0;
citrusbyte 0:5a4798128c36 481 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 482 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 483 bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\""));
citrusbyte 0:5a4798128c36 484 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 485 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 486 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 487 bytes += print->print(F("/streams/"));
citrusbyte 0:5a4798128c36 488 bytes += print->print(streamName);
citrusbyte 0:5a4798128c36 489 bytes += print->print(F("/value"));
citrusbyte 0:5a4798128c36 490 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 491 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 492 bytes += print->print(F("\",\"body\":"));
citrusbyte 0:5a4798128c36 493 bytes += print->print(F("{\"value\":\""));
citrusbyte 0:5a4798128c36 494 bytes += print->print(value);
citrusbyte 0:5a4798128c36 495 bytes += print->print(F("\"}"));
citrusbyte 0:5a4798128c36 496 bytes += print->print(F("}"));
citrusbyte 1:c4d41ff3c58e 497 return bytes;
citrusbyte 0:5a4798128c36 498 }
citrusbyte 0:5a4798128c36 499
citrusbyte 0:5a4798128c36 500 template <class T>
citrusbyte 0:5a4798128c36 501 int M2XMQTTClient::postDeviceUpdates(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 502 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 503 const char* ats[], T values[]) {
citrusbyte 0:5a4798128c36 504 int length;
citrusbyte 0:5a4798128c36 505 if (!_connected) {
citrusbyte 0:5a4798128c36 506 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 507 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 508 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 509 }
citrusbyte 0:5a4798128c36 510 }
citrusbyte 0:5a4798128c36 511 _current_id++;
citrusbyte 0:5a4798128c36 512 length = printPostDeviceUpdatesPayload(&_null_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 513 names, counts, ats, values);
citrusbyte 0:5a4798128c36 514 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 515 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 516 length + _key_length + 15);
citrusbyte 0:5a4798128c36 517 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 518 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 519 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 520 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 521 printPostDeviceUpdatesPayload(&_mmqtt_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 522 names, counts, ats, values);
citrusbyte 0:5a4798128c36 523 return readStatusCode();
citrusbyte 0:5a4798128c36 524 }
citrusbyte 0:5a4798128c36 525
citrusbyte 0:5a4798128c36 526 template <class T>
citrusbyte 0:5a4798128c36 527 int M2XMQTTClient::printPostDeviceUpdatesPayload(Print* print,
citrusbyte 0:5a4798128c36 528 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 529 const char* names[], const int counts[],
citrusbyte 0:5a4798128c36 530 const char* ats[], T values[]) {
citrusbyte 0:5a4798128c36 531 int bytes = 0, value_index = 0, i, j;
citrusbyte 0:5a4798128c36 532 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 533 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 534 bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\""));
citrusbyte 0:5a4798128c36 535 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 536 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 537 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 538 bytes += print->print(F("/updates"));
citrusbyte 0:5a4798128c36 539 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 540 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 541 bytes += print->print(F("\",\"body\":"));
citrusbyte 0:5a4798128c36 542 bytes += print->print(F("{\"values\":{"));
citrusbyte 0:5a4798128c36 543 for (i = 0; i < streamNum; i++) {
citrusbyte 0:5a4798128c36 544 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 545 bytes += print->print(names[i]);
citrusbyte 0:5a4798128c36 546 bytes += print->print(F("\":["));
citrusbyte 0:5a4798128c36 547 for (j = 0; j < counts[i]; j++) {
citrusbyte 0:5a4798128c36 548 bytes += print->print(F("{\"timestamp\": \""));
citrusbyte 0:5a4798128c36 549 bytes += print->print(ats[value_index]);
citrusbyte 0:5a4798128c36 550 bytes += print->print(F("\",\"value\": \""));
citrusbyte 0:5a4798128c36 551 bytes += print->print(values[value_index]);
citrusbyte 0:5a4798128c36 552 bytes += print->print(F("\"}"));
citrusbyte 0:5a4798128c36 553 if (j < counts[i] - 1) { bytes += print->print(F(",")); }
citrusbyte 0:5a4798128c36 554 value_index++;
citrusbyte 0:5a4798128c36 555 }
citrusbyte 0:5a4798128c36 556 bytes += print->print(F("]"));
citrusbyte 0:5a4798128c36 557 if (i < streamNum - 1) { bytes += print->print(F(",")); }
citrusbyte 0:5a4798128c36 558 }
citrusbyte 0:5a4798128c36 559 bytes += print->print(F(("}}}")));
citrusbyte 0:5a4798128c36 560 return bytes;
citrusbyte 0:5a4798128c36 561 }
citrusbyte 0:5a4798128c36 562
citrusbyte 0:5a4798128c36 563 template <class T>
citrusbyte 0:5a4798128c36 564 int M2XMQTTClient::postDeviceUpdate(const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 565 const char* names[], T values[],
citrusbyte 0:5a4798128c36 566 const char* at) {
citrusbyte 0:5a4798128c36 567 int length;
citrusbyte 0:5a4798128c36 568 if (!_connected) {
citrusbyte 0:5a4798128c36 569 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 570 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 571 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 572 }
citrusbyte 0:5a4798128c36 573 }
citrusbyte 0:5a4798128c36 574 _current_id++;
citrusbyte 0:5a4798128c36 575 length = printPostDeviceUpdatePayload(&_null_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 576 names, values, at);
citrusbyte 0:5a4798128c36 577 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 578 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 579 length + _key_length + 15);
citrusbyte 0:5a4798128c36 580 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 581 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 582 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 583 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 584 printPostDeviceUpdatePayload(&_mmqtt_print, deviceId, streamNum,
citrusbyte 0:5a4798128c36 585 names, values, at);
citrusbyte 0:5a4798128c36 586 return readStatusCode();
citrusbyte 0:5a4798128c36 587 }
citrusbyte 0:5a4798128c36 588
citrusbyte 0:5a4798128c36 589 template <class T>
citrusbyte 0:5a4798128c36 590 int M2XMQTTClient::printPostDeviceUpdatePayload(Print* print,
citrusbyte 0:5a4798128c36 591 const char* deviceId, int streamNum,
citrusbyte 0:5a4798128c36 592 const char* names[], T values[],
citrusbyte 0:5a4798128c36 593 const char* at) {
citrusbyte 0:5a4798128c36 594 int bytes = 0, i;
citrusbyte 0:5a4798128c36 595 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 596 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 597 bytes += print->print(F("\",\"method\":\"POST\",\"resource\":\""));
citrusbyte 0:5a4798128c36 598 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 599 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 600 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 601 bytes += print->print(F("/update"));
citrusbyte 0:5a4798128c36 602 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 603 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 604 bytes += print->print(F("\",\"body\":"));
citrusbyte 0:5a4798128c36 605 bytes += print->print(F("{\"values\":{"));
citrusbyte 0:5a4798128c36 606 for (int i = 0; i < streamNum; i++) {
citrusbyte 0:5a4798128c36 607 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 608 bytes += print->print(names[i]);
citrusbyte 0:5a4798128c36 609 bytes += print->print(F("\": \""));
citrusbyte 0:5a4798128c36 610 bytes += print->print(values[i]);
citrusbyte 0:5a4798128c36 611 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 612 if (i < streamNum - 1) { bytes += print->print(F(",")); }
citrusbyte 0:5a4798128c36 613 }
citrusbyte 0:5a4798128c36 614 bytes += print->print(F("}"));
citrusbyte 0:5a4798128c36 615 if (at != NULL) {
citrusbyte 0:5a4798128c36 616 bytes += print->print(F(",\"timestamp\":\""));
citrusbyte 0:5a4798128c36 617 bytes += print->print(at);
citrusbyte 0:5a4798128c36 618 bytes += print->print(F("\""));
citrusbyte 0:5a4798128c36 619 }
citrusbyte 0:5a4798128c36 620 bytes += print->print(F(("}")));
citrusbyte 0:5a4798128c36 621 return bytes;
citrusbyte 0:5a4798128c36 622 }
citrusbyte 0:5a4798128c36 623
citrusbyte 0:5a4798128c36 624 template <class T>
citrusbyte 0:5a4798128c36 625 int M2XMQTTClient::updateLocation(const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 626 T latitude, T longitude, T elevation) {
citrusbyte 0:5a4798128c36 627 int length;
citrusbyte 0:5a4798128c36 628 if (!_connected) {
citrusbyte 0:5a4798128c36 629 if (connectToServer() != E_OK) {
citrusbyte 0:5a4798128c36 630 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 0:5a4798128c36 631 return E_NOCONNECTION;
citrusbyte 0:5a4798128c36 632 }
citrusbyte 0:5a4798128c36 633 }
citrusbyte 0:5a4798128c36 634 _current_id++;
citrusbyte 0:5a4798128c36 635 length = printUpdateLocationPayload(&_null_print, deviceId, name,
citrusbyte 0:5a4798128c36 636 latitude, longitude, elevation);
citrusbyte 0:5a4798128c36 637 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 0:5a4798128c36 638 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 0:5a4798128c36 639 length + _key_length + 15);
citrusbyte 0:5a4798128c36 640 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 0:5a4798128c36 641 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 0:5a4798128c36 642 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 0:5a4798128c36 643 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 0:5a4798128c36 644 printUpdateLocationPayload(&_mmqtt_print, deviceId, name,
citrusbyte 0:5a4798128c36 645 latitude, longitude, elevation);
citrusbyte 0:5a4798128c36 646 return readStatusCode();
citrusbyte 0:5a4798128c36 647 }
citrusbyte 0:5a4798128c36 648
citrusbyte 0:5a4798128c36 649 template <class T>
citrusbyte 0:5a4798128c36 650 int M2XMQTTClient::printUpdateLocationPayload(Print* print,
citrusbyte 0:5a4798128c36 651 const char* deviceId, const char* name,
citrusbyte 0:5a4798128c36 652 T latitude, T longitude, T elevation) {
citrusbyte 0:5a4798128c36 653 int bytes = 0;
citrusbyte 0:5a4798128c36 654 bytes += print->print(F("{\"id\":\""));
citrusbyte 0:5a4798128c36 655 bytes += print->print(_current_id);
citrusbyte 0:5a4798128c36 656 bytes += print->print(F("\",\"method\":\"PUT\",\"resource\":\""));
citrusbyte 0:5a4798128c36 657 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 0:5a4798128c36 658 bytes += print->print(F("/v2/devices/"));
citrusbyte 0:5a4798128c36 659 bytes += print->print(deviceId);
citrusbyte 0:5a4798128c36 660 bytes += print->print(F("/location"));
citrusbyte 0:5a4798128c36 661 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 0:5a4798128c36 662 bytes += print->print(USER_AGENT);
citrusbyte 0:5a4798128c36 663 bytes += print->print(F("\",\"body\":{\"name\":\""));
citrusbyte 0:5a4798128c36 664 bytes += print->print(name);
citrusbyte 0:5a4798128c36 665 bytes += print->print(F("\",\"latitude\":\""));
citrusbyte 0:5a4798128c36 666 bytes += print->print(latitude);
citrusbyte 0:5a4798128c36 667 bytes += print->print(F("\",\"longitude\":\""));
citrusbyte 0:5a4798128c36 668 bytes += print->print(longitude);
citrusbyte 0:5a4798128c36 669 bytes += print->print(F("\",\"elevation\":\""));
citrusbyte 0:5a4798128c36 670 bytes += print->print(elevation);
citrusbyte 0:5a4798128c36 671 bytes += print->print(F(("\"}}")));
citrusbyte 0:5a4798128c36 672 return bytes;
citrusbyte 0:5a4798128c36 673 }
citrusbyte 0:5a4798128c36 674
citrusbyte 1:c4d41ff3c58e 675 int M2XMQTTClient::deleteValues(const char* deviceId, const char* streamName,
citrusbyte 1:c4d41ff3c58e 676 const char* from, const char* end) {
citrusbyte 1:c4d41ff3c58e 677 int length;
citrusbyte 1:c4d41ff3c58e 678 if (!_connected) {
citrusbyte 1:c4d41ff3c58e 679 if (connectToServer() != E_OK) {
citrusbyte 1:c4d41ff3c58e 680 DBGLN("%s", "ERROR: Cannot connect to M2X server!");
citrusbyte 1:c4d41ff3c58e 681 return E_NOCONNECTION;
citrusbyte 1:c4d41ff3c58e 682 }
citrusbyte 1:c4d41ff3c58e 683 }
citrusbyte 1:c4d41ff3c58e 684 _current_id++;
citrusbyte 1:c4d41ff3c58e 685 length = printDeleteValuesPayload(&_null_print, deviceId, streamName, from, end);
citrusbyte 1:c4d41ff3c58e 686 mmqtt_s_encode_fixed_header(&_connection, m2x_mmqtt_puller,
citrusbyte 1:c4d41ff3c58e 687 MMQTT_PACK_MESSAGE_TYPE(MMQTT_MESSAGE_TYPE_PUBLISH),
citrusbyte 1:c4d41ff3c58e 688 length + _key_length + 15);
citrusbyte 1:c4d41ff3c58e 689 mmqtt_s_encode_uint16(&_connection, m2x_mmqtt_puller, _key_length + 13);
citrusbyte 1:c4d41ff3c58e 690 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("m2x/"), 4);
citrusbyte 1:c4d41ff3c58e 691 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) _key, _key_length);
citrusbyte 1:c4d41ff3c58e 692 mmqtt_s_encode_buffer(&_connection, m2x_mmqtt_puller, (const uint8_t *) F("/requests"), 9);
citrusbyte 1:c4d41ff3c58e 693 printDeleteValuesPayload(&_mmqtt_print, deviceId, streamName, from, end);
citrusbyte 1:c4d41ff3c58e 694 return readStatusCode();
citrusbyte 1:c4d41ff3c58e 695 }
citrusbyte 1:c4d41ff3c58e 696
citrusbyte 1:c4d41ff3c58e 697 int M2XMQTTClient::printDeleteValuesPayload(Print *print,
citrusbyte 1:c4d41ff3c58e 698 const char* deviceId, const char* streamName,
citrusbyte 1:c4d41ff3c58e 699 const char* from, const char* end) {
citrusbyte 1:c4d41ff3c58e 700 int bytes = 0;
citrusbyte 1:c4d41ff3c58e 701 bytes += print->print(F("{\"id\":\""));
citrusbyte 1:c4d41ff3c58e 702 bytes += print->print(_current_id);
citrusbyte 1:c4d41ff3c58e 703 bytes += print->print(F("\",\"method\":\"DELETE\",\"resource\":\""));
citrusbyte 1:c4d41ff3c58e 704 if (_path_prefix) { bytes += print->print(_path_prefix); }
citrusbyte 1:c4d41ff3c58e 705 bytes += print->print(F("/v2/devices/"));
citrusbyte 1:c4d41ff3c58e 706 bytes += print->print(deviceId);
citrusbyte 1:c4d41ff3c58e 707 bytes += print->print(F("/streams/"));
citrusbyte 1:c4d41ff3c58e 708 bytes += print->print(streamName);
citrusbyte 1:c4d41ff3c58e 709 bytes += print->print(F("/values"));
citrusbyte 1:c4d41ff3c58e 710 bytes += print->print(F("\",\"agent\":\""));
citrusbyte 1:c4d41ff3c58e 711 bytes += print->print(USER_AGENT);
citrusbyte 1:c4d41ff3c58e 712 bytes += print->print(F("\",\"body\":{\"from\":\""));
citrusbyte 1:c4d41ff3c58e 713 bytes += print->print(from);
citrusbyte 1:c4d41ff3c58e 714 bytes += print->print(F("\",\"end\":\""));
citrusbyte 1:c4d41ff3c58e 715 bytes += print->print(end);
citrusbyte 1:c4d41ff3c58e 716 bytes += print->print(F(("\"}}")));
citrusbyte 1:c4d41ff3c58e 717 return bytes;
citrusbyte 1:c4d41ff3c58e 718 }
citrusbyte 1:c4d41ff3c58e 719
citrusbyte 0:5a4798128c36 720 int M2XMQTTClient::readStatusCode() {
citrusbyte 0:5a4798128c36 721 mmqtt_status_t status;
citrusbyte 0:5a4798128c36 722 uint32_t packet_length;
citrusbyte 0:5a4798128c36 723 uint8_t flag;
citrusbyte 0:5a4798128c36 724 int16_t parsed_id, response_status;
citrusbyte 0:5a4798128c36 725 struct mjson_ctx ctx;
citrusbyte 0:5a4798128c36 726 char buf[6];
citrusbyte 0:5a4798128c36 727 size_t buf_length;
citrusbyte 0:5a4798128c36 728
citrusbyte 0:5a4798128c36 729 while (true) {
citrusbyte 0:5a4798128c36 730 status = mmqtt_s_decode_fixed_header(&_connection, m2x_mmqtt_pusher,
citrusbyte 0:5a4798128c36 731 &flag, &packet_length);
citrusbyte 0:5a4798128c36 732 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 733 DBG("%s", F("Error decoding publish fixed header: "));
citrusbyte 0:5a4798128c36 734 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 735 close();
citrusbyte 0:5a4798128c36 736 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 737 }
citrusbyte 0:5a4798128c36 738 if (MMQTT_UNPACK_MESSAGE_TYPE(flag) != MMQTT_MESSAGE_TYPE_PUBLISH) {
citrusbyte 0:5a4798128c36 739 status = mmqtt_s_skip_buffer(&_connection, m2x_mmqtt_pusher, packet_length);
citrusbyte 0:5a4798128c36 740 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 741 DBG("%s", F("Error skipping non-publish packet: "));
citrusbyte 0:5a4798128c36 742 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 743 close();
citrusbyte 0:5a4798128c36 744 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 745 }
citrusbyte 0:5a4798128c36 746 } else {
citrusbyte 0:5a4798128c36 747 /*
citrusbyte 0:5a4798128c36 748 * Since we only subscribe to one channel, there's no need to check channel name.
citrusbyte 0:5a4798128c36 749 */
citrusbyte 0:5a4798128c36 750 status = mmqtt_s_decode_string(&_connection, m2x_mmqtt_pusher, NULL, 0,
citrusbyte 0:5a4798128c36 751 NULL, NULL);
citrusbyte 0:5a4798128c36 752 if (status != MMQTT_STATUS_OK) {
citrusbyte 0:5a4798128c36 753 DBG("%s", F("Error skipping channel name string: "));
citrusbyte 0:5a4798128c36 754 DBGLN("%d", status);
citrusbyte 0:5a4798128c36 755 close();
citrusbyte 0:5a4798128c36 756 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 757 }
citrusbyte 0:5a4798128c36 758 /*
citrusbyte 0:5a4798128c36 759 * Parse JSON body for ID and status
citrusbyte 0:5a4798128c36 760 */
citrusbyte 0:5a4798128c36 761 mjson_init(&ctx, &_connection, m2x_mjson_reader);
citrusbyte 0:5a4798128c36 762 parsed_id = -1;
citrusbyte 0:5a4798128c36 763 if (mjson_readcheck_object_start(&ctx) != MJSON_OK) {
citrusbyte 0:5a4798128c36 764 /* Oops we have an error */
citrusbyte 0:5a4798128c36 765 DBG("%s", F("Publish Packet is not a JSON object!"));
citrusbyte 0:5a4798128c36 766 close();
citrusbyte 0:5a4798128c36 767 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 768 }
citrusbyte 0:5a4798128c36 769 while (mjson_read_object_separator_or_end(&ctx) != MJSON_SUBTYPE_OBJECT_END) {
citrusbyte 0:5a4798128c36 770 if (mjson_readcheck_string_start(&ctx) != MJSON_OK) {
citrusbyte 0:5a4798128c36 771 DBG("%s", F("Object key is not string!"));
citrusbyte 0:5a4798128c36 772 close();
citrusbyte 0:5a4798128c36 773 return E_DISCONNECTED;
citrusbyte 0:5a4798128c36 774 }
citrusbyte 0:5a4798128c36 775 mjson_read_full_string(&ctx, buf, 6, &buf_length);
citrusbyte 0:5a4798128c36 776 mjson_read_object_key_separator(&ctx);
citrusbyte 0:5a4798128c36 777 if (strncmp(buf, F("status"), 6) == 0) {
citrusbyte 0:5a4798128c36 778 mjson_read_int16(&ctx, &response_status);
citrusbyte 0:5a4798128c36 779 } else if (strncmp(buf, F("id"), 2) == 0) {
citrusbyte 0:5a4798128c36 780 /* Hack since we know the ID passed is actually an integer*/
citrusbyte 0:5a4798128c36 781 mjson_readcheck_string_start(&ctx);
citrusbyte 0:5a4798128c36 782 mjson_read_int16(&ctx, &parsed_id);
citrusbyte 0:5a4798128c36 783 mjson_read_string_end(&ctx);
citrusbyte 0:5a4798128c36 784 } else {
citrusbyte 0:5a4798128c36 785 mjson_skip_value(&ctx);
citrusbyte 0:5a4798128c36 786 }
citrusbyte 0:5a4798128c36 787 }
citrusbyte 0:5a4798128c36 788 if (parsed_id == _current_id) { return response_status; }
citrusbyte 0:5a4798128c36 789 }
citrusbyte 0:5a4798128c36 790 }
citrusbyte 0:5a4798128c36 791 return E_NOTREACHABLE;
citrusbyte 0:5a4798128c36 792 }
citrusbyte 0:5a4798128c36 793
citrusbyte 0:5a4798128c36 794 void M2XMQTTClient::close() {
citrusbyte 0:5a4798128c36 795 _client->stop();
citrusbyte 0:5a4798128c36 796 _connected = false;
citrusbyte 0:5a4798128c36 797 }
citrusbyte 0:5a4798128c36 798
citrusbyte 0:5a4798128c36 799 #endif /* M2XMQTTCLIENT_H_ */