【mbed OS5対応バージョン】データの保存、更新、取得ができるWebサービス「milkcocoa」に接続し、データのプッシュ、送信、取得ができるライブラリです。 https://mlkcca.com/
Dependents: mbed-os-example-wifi-milkcocoa MilkcocoaOsSample_Eth MilkcocoaOsSample_ESP8266 MilkcocoaOsSample_Eth_DigitalIn
Revision 9:5c195c1036da, committed 2017-09-06
- Comitter:
- jksoft
- Date:
- Wed Sep 06 05:58:26 2017 +0000
- Parent:
- 8:e2f15b1b4f70
- Child:
- 10:c52abd2d6595
- Commit message:
- ?????mbed OS5.5.4?????
Changed in this revision
--- a/MQTT/FP/FP.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,212 +0,0 @@ -/** - * @file FP.h - * @brief Core Utility - Templated Function Pointer Class - * @author sam grove - * @version 1.1 - * @see http://mbed.org/users/sam_grove/code/FP/ - * - * Copyright (c) 2013 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FP_H -#define FP_H - -/** Example using the FP Class with global functions - * @code - * #include "mbed.h" - * #include "FP.h" - * - * FP<void,bool>fp; - * DigitalOut myled(LED1); - * - * void handler(bool value) - * { - * myled = value; - * return; - * } - * - * int main() - * { - * fp.attach(&handler); - * - * while(1) - * { - * fp(1); - * wait(0.2); - * fp(0); - * wait(0.2); - * } - * } - * @endcode - */ - -/** Example using the FP Class with different class member functions - * @code - * #include "mbed.h" - * #include "FP.h" - * - * FP<void,bool>fp; - * DigitalOut myled(LED4); - * - * class Wrapper - * { - * public: - * Wrapper(){} - * - * void handler(bool value) - * { - * myled = value; - * return; - * } - * }; - * - * int main() - * { - * Wrapper wrapped; - * fp.attach(&wrapped, &Wrapper::handler); - * - * while(1) - * { - * fp(1); - * wait(0.2); - * fp(0); - * wait(0.2); - * } - * } - * @endcode - */ - -/** Example using the FP Class with member FP and member function -* @code -* #include "mbed.h" -* #include "FP.h" -* -* DigitalOut myled(LED2); -* -* class Wrapper -* { -* public: -* Wrapper() -* { -* fp.attach(this, &Wrapper::handler); -* } -* -* void handler(bool value) -* { -* myled = value; -* return; -* } -* -* FP<void,bool>fp; -* }; -* -* int main() -* { -* Wrapper wrapped; -* -* while(1) -* { -* wrapped.fp(1); -* wait(0.2); -* wrapped.fp(0); -* wait(0.2); -* } -* } -* @endcode -*/ - -/** - * @class FP - * @brief API for managing Function Pointers - */ -template<class retT, class argT> -class FP_ -{ -public: - /** Create the FP object - only one callback can be attached to the object, that is - * a member function or a global function, not both at the same time - */ - FP_() - { - obj_callback = 0; - c_callback = 0; - } - - /** Add a callback function to the object - * @param item - Address of the initialized object - * @param member - Address of the member function (dont forget the scope that the function is defined in) - */ - template<class T> - void attach(T *item, retT (T::*method)(argT)) - { - obj_callback = (FPtrDummy *)(item); - method_callback = (retT (FPtrDummy::*)(argT))(method); - return; - } - - /** Add a callback function to the object - * @param function - The address of a globally defined function - */ - void attach(retT (*function)(argT)) - { - c_callback = function; - } - - /** Invoke the function attached to the class - * @param arg - An argument that is passed into the function handler that is called - * @return The return from the function hanlder called by this class - */ - retT operator()(argT arg) const - { - if( 0 != c_callback ) { - return obj_callback ? (obj_callback->*method_callback)(arg) : (*c_callback)(arg); - } - return (retT)0; - } - - /** Determine if an callback is currently hooked - * @return 1 if a method is hooked, 0 otherwise - */ - bool attached() - { - return obj_callback || c_callback; - } - - /** Release a function from the callback hook - */ - void detach() - { - obj_callback = 0; - c_callback = 0; - } - -private: - - // empty type used for casting - class FPtrDummy; - - FPtrDummy *obj_callback; - - /** - * @union Funciton - * @brief Member or global callback function - */ - union { - retT (*c_callback)(argT); /*!< Footprint for a global function */ - retT (FPtrDummy::*method_callback)(argT); /*!< Footprint for a member function */ - }; -}; - -#endif
--- a/MQTT/MQTTAsync.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,607 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#if !defined(MQTTASYNC_H) -#define MQTTASYNC_H - -#include "FP.h" -#include "MQTTPacket.h" -#include "stdio.h" - -namespace MQTT -{ - - -enum QoS { QOS0, QOS1, QOS2 }; - - -struct Message -{ - enum QoS qos; - bool retained; - bool dup; - unsigned short id; - void *payload; - size_t payloadlen; -}; - - -class PacketId -{ -public: - PacketId(); - - int getNext(); - -private: - static const int MAX_PACKET_ID = 65535; - int next; -}; - -typedef void (*messageHandler)(Message*); - -typedef struct limits -{ - int MAX_MQTT_PACKET_SIZE; // - int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler - int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode - int command_timeout_ms; - - limits() - { - MAX_MQTT_PACKET_SIZE = 256; - MAX_MESSAGE_HANDLERS = 5; - MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode - command_timeout_ms = 30000; - } -} Limits; - - -/** - * @class Async - * @brief non-blocking, threaded MQTT client API - * @param Network a network class which supports send, receive - * @param Timer a timer class with the methods: - */ -template<class Network, class Timer, class Thread, class Mutex> class Async -{ - -public: - - struct Result - { - /* success or failure result data */ - Async<Network, Timer, Thread, Mutex>* client; - int rc; - }; - - typedef void (*resultHandler)(Result*); - - Async(Network* network, const Limits limits = Limits()); - - typedef struct - { - Async* client; - Network* network; - } connectionLostInfo; - - typedef int (*connectionLostHandlers)(connectionLostInfo*); - - /** Set the connection lost callback - called whenever the connection is lost and we should be connected - * @param clh - pointer to the callback function - */ - void setConnectionLostHandler(connectionLostHandlers clh) - { - connectionLostHandler.attach(clh); - } - - /** Set the default message handling callback - used for any message which does not match a subscription message handler - * @param mh - pointer to the callback function - */ - void setDefaultMessageHandler(messageHandler mh) - { - defaultMessageHandler.attach(mh); - } - - int connect(resultHandler fn, MQTTPacket_connectData* options = 0); - - template<class T> - int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function - - int publish(resultHandler rh, const char* topic, Message* message); - - int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh); - - int unsubscribe(resultHandler rh, const char* topicFilter); - - int disconnect(resultHandler rh); - -private: - - void run(void const *argument); - int cycle(int timeout); - int waitfor(int packet_type, Timer& atimer); - int keepalive(); - int findFreeOperation(); - - int decodePacket(int* value, int timeout); - int readPacket(int timeout); - int sendPacket(int length, int timeout); - int deliverMessage(MQTTString* topic, Message* message); - - Thread* thread; - Network* ipstack; - - Limits limits; - - char* buf; - char* readbuf; - - Timer ping_timer, connect_timer; - unsigned int keepAliveInterval; - bool ping_outstanding; - - PacketId packetid; - - typedef FP_<void, Result*> resultHandlerFP; - resultHandlerFP connectHandler; - - typedef FP_<void, Message*> messageHandlerFP; - struct MessageHandlers - { - const char* topic; - messageHandlerFP fp; - } *messageHandlers; // Message handlers are indexed by subscription topic - - // how many concurrent operations should we allow? Each one will require a function pointer - struct Operations - { - unsigned short id; - resultHandlerFP fp; - const char* topic; // if this is a publish, store topic name in case republishing is required - Message* message; // for publish, - Timer timer; // to check if the command has timed out - } *operations; // result handlers are indexed by packet ids - - static void threadfn(void* arg); - - messageHandlerFP defaultMessageHandler; - - typedef FP_<int, connectionLostInfo*> connectionLostFP; - - connectionLostFP connectionLostHandler; - -}; - -} - - -template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg) -{ - ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL); -} - - -template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid() -{ - this->thread = 0; - this->ipstack = network; - this->ping_timer = Timer(); - this->ping_outstanding = 0; - - // How to make these memory allocations portable? I was hoping to avoid the heap - buf = new char[limits.MAX_MQTT_PACKET_SIZE]; - readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; - this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS]; - for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) - operations[i].id = 0; - this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; - for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) - messageHandlers[i].topic = 0; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) -{ - int sent = 0; - - while (sent < length) - sent += ipstack->write(&buf[sent], length, timeout); - if (sent == length) - ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet - return sent; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) -{ - char c; - int multiplier = 1; - int len = 0; - const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; - - *value = 0; - do - { - int rc = MQTTPACKET_READ_ERROR; - - if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) - { - rc = MQTTPACKET_READ_ERROR; /* bad data */ - goto exit; - } - rc = ipstack->read(&c, 1, timeout); - if (rc != 1) - goto exit; - *value += (c & 127) * multiplier; - multiplier *= 128; - } while ((c & 128) != 0); -exit: - return len; -} - - -/** - * If any read fails in this method, then we should disconnect from the network, as on reconnect - * the packets can be retried. - * @param timeout the max time to wait for the packet read to complete, in milliseconds - * @return the MQTT packet type, or -1 if none - */ -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) -{ - int rc = -1; - MQTTHeader header = {0}; - int len = 0; - int rem_len = 0; - - /* 1. read the header byte. This has the packet type in it */ - if (ipstack->read(readbuf, 1, timeout) != 1) - goto exit; - - len = 1; - /* 2. read the remaining length. This is variable in itself */ - decodePacket(&rem_len, timeout); - len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ - - /* 3. read the rest of the buffer using a callback to supply the rest of the data */ - if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) - goto exit; - - header.byte = readbuf[0]; - rc = header.bits.type; -exit: - return rc; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) -{ - int rc = -1; - - // we have to find the right message handler - indexed by topic - for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) - { - messageHandlers[i].fp(message); - rc = 0; - break; - } - } - - return rc; -} - - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout) -{ - /* get one piece of work off the wire and one pass through */ - - // read the socket, see what work is due - int packet_type = readPacket(timeout); - - int len, rc; - switch (packet_type) - { - case CONNACK: - if (this->thread) - { - Result res = {this, 0}; - if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - ; - connectHandler(&res); - connectHandler.detach(); // only invoke the callback once - } - break; - case PUBACK: - if (this->thread) - ; //call resultHandler - case SUBACK: - break; - case PUBLISH: - MQTTString topicName; - Message msg; - rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; - if (msg.qos == QOS0) - deliverMessage(&topicName, &msg); - break; - case PUBREC: - int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - ; - // must lock this access against the application thread, if we are multi-threaded - len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); - rc = sendPacket(len, timeout); // send the PUBREL packet - if (rc != len) - goto exit; // there was a problem - - break; - case PUBCOMP: - break; - case PINGRESP: - ping_outstanding = false; - break; - } - keepalive(); -exit: - return packet_type; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive() -{ - int rc = 0; - - if (keepAliveInterval == 0) - goto exit; - - if (ping_timer.expired()) - { - if (ping_outstanding) - rc = -1; - else - { - int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); - rc = sendPacket(len, 1000); // send the ping packet - if (rc != len) - rc = -1; // indicate there's a problem - else - ping_outstanding = true; - } - } - -exit: - return rc; -} - - -template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument) -{ - while (true) - cycle(ping_timer.left_ms()); -} - - -// only used in single-threaded mode where one command at a time is in process -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) -{ - int rc = -1; - - do - { - if (atimer.expired()) - break; // we timed out - } - while ((rc = cycle(atimer.left_ms())) != packet_type); - - return rc; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options) -{ - connect_timer.countdown(limits.command_timeout_ms); - - MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; - if (options == 0) - options = &default_options; // set default options if none were supplied - - this->keepAliveInterval = options->keepAliveInterval; - ping_timer.countdown(this->keepAliveInterval); - int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); - int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet - if (rc != len) - goto exit; // there was a problem - - if (resultHandler == 0) // wait until the connack is received - { - // this will be a blocking call, wait for the connack - if (waitfor(CONNACK, connect_timer) == CONNACK) - { - int connack_rc = -1; - if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - rc = connack_rc; - } - } - else - { - // set connect response callback function - connectHandler.attach(resultHandler); - - // start background thread - this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this); - } - -exit: - return rc; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation() -{ - int found = -1; - for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) - { - if (operations[i].id == 0) - { - found = i; - break; - } - } - return found; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler) -{ - int index = 0; - if (this->thread) - index = findFreeOperation(); - Timer& atimer = operations[index].timer; - - atimer.countdown(limits.command_timeout_ms); - MQTTString topic = {(char*)topicFilter, 0, 0}; - - int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); - int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet - if (rc != len) - goto exit; // there was a problem - - /* wait for suback */ - if (resultHandler == 0) - { - // this will block - if (waitfor(SUBACK, atimer) == SUBACK) - { - int count = 0, grantedQoS = -1, mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - rc = grantedQoS; // 0, 1, 2 or 0x80 - if (rc != 0x80) - { - for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topic == 0) - { - messageHandlers[i].topic = topicFilter; - messageHandlers[i].fp.attach(messageHandler); - rc = 0; - break; - } - } - } - } - } - else - { - // set subscribe response callback function - - } - -exit: - return rc; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter) -{ - int index = 0; - if (this->thread) - index = findFreeOperation(); - Timer& atimer = operations[index].timer; - - atimer.countdown(limits.command_timeout_ms); - MQTTString topic = {(char*)topicFilter, 0, 0}; - - int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); - int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet - if (rc != len) - goto exit; // there was a problem - - // set unsubscribe response callback function - - -exit: - return rc; -} - - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message) -{ - int index = 0; - if (this->thread) - index = findFreeOperation(); - Timer& atimer = operations[index].timer; - - atimer.countdown(limits.command_timeout_ms); - MQTTString topic = {(char*)topicName, 0, 0}; - - if (message->qos == QOS1 || message->qos == QOS2) - message->id = packetid.getNext(); - - int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); - int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet - if (rc != len) - goto exit; // there was a problem - - /* wait for acks */ - if (resultHandler == 0) - { - if (message->qos == QOS1) - { - if (waitfor(PUBACK, atimer) == PUBACK) - { - int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - rc = 0; - } - } - else if (message->qos == QOS2) - { - if (waitfor(PUBCOMP, atimer) == PUBCOMP) - { - int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) - rc = 0; - } - - } - } - else - { - // set publish response callback function - - } - -exit: - return rc; -} - - -template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler) -{ - Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete - int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); - int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet - - return (rc == len) ? 0 : -1; -} - - - -#endif \ No newline at end of file
--- a/MQTT/MQTTClient.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,948 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014, 2015 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - * Ian Craggs - fix for bug 458512 - QoS 2 messages - * Ian Craggs - fix for bug 460389 - send loop uses wrong length - * Ian Craggs - fix for bug 464169 - clearing subscriptions - * Ian Craggs - fix for bug 464551 - enums and ints can be different size - *******************************************************************************/ - -#if !defined(MQTTCLIENT_H) -#define MQTTCLIENT_H - -#include "FP.h" -#include "MQTTPacket.h" -#include "stdio.h" -#include "MQTTLogging.h" - -#if !defined(MQTTCLIENT_QOS1) - #define MQTTCLIENT_QOS1 1 -#endif -#if !defined(MQTTCLIENT_QOS2) - #define MQTTCLIENT_QOS2 0 -#endif - -#if 1 -extern RawSerial pc; - -#define DBG(x) x -#else -#define DBG(x) -#endif - -namespace MQTT -{ - - -enum QoS { QOS0, QOS1, QOS2 }; - -// all failure return codes must be negative -enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; - - -struct Message -{ - enum QoS qos; - bool retained; - bool dup; - unsigned short id; - void *payload; - size_t payloadlen; -}; - - -struct MessageData -{ - MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) - { } - - struct Message &message; - MQTTString &topicName; -}; - - -class PacketId -{ -public: - PacketId() - { - next = 0; - } - - int getNext() - { - return next = (next == MAX_PACKET_ID) ? 1 : ++next; - } - -private: - static const int MAX_PACKET_ID = 65535; - int next; -}; - - -/** - * @class Client - * @brief blocking, non-threaded MQTT client API - * - * This version of the API blocks on all method calls, until they are complete. This means that only one - * MQTT request can be in process at any one time. - * @param Network a network class which supports send, receive - * @param Timer a timer class with the methods: - */ -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 256, int MAX_MESSAGE_HANDLERS = 5> -class Client -{ - -public: - - typedef void (*messageHandler)(MessageData&); - - /** Construct the client - * @param network - pointer to an instance of the Network class - must be connected to the endpoint - * before calling MQTT connect - * @param limits an instance of the Limit class - to alter limits as required - */ - Client(Network& network, unsigned int command_timeout_ms = 30000); - - /** Set the default message handling callback - used for any message which does not match a subscription message handler - * @param mh - pointer to the callback function - */ - void setDefaultMessageHandler(messageHandler mh) - { - defaultMessageHandler.attach(mh); - } - - /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack - * The nework object must be connected to the network endpoint before calling this - * Default connect options are used - * @return success code - - */ - int connect(); - - /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack - * The nework object must be connected to the network endpoint before calling this - * @param options - connect options - * @return success code - - */ - int connect(MQTTPacket_connectData& options); - - /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs - * @param topic - the topic to publish to - * @param message - the message to send - * @return success code - - */ - int publish(const char* topicName, Message& message); - - /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs - * @param topic - the topic to publish to - * @param payload - the data to send - * @param payloadlen - the length of the data - * @param qos - the QoS to send the publish at - * @param retained - whether the message should be retained - * @return success code - - */ - int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); - - /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs - * @param topic - the topic to publish to - * @param payload - the data to send - * @param payloadlen - the length of the data - * @param id - the packet id used - returned - * @param qos - the QoS to send the publish at - * @param retained - whether the message should be retained - * @return success code - - */ - int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); - - /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback - * @param topicFilter - a topic pattern which can include wildcards - * @param qos - the MQTT QoS to subscribe at - * @param mh - the callback function to be invoked when a message is received for this subscription - * @return success code - - */ - int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); - - /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback - * @param topicFilter - a topic pattern which can include wildcards - * @return success code - - */ - int unsubscribe(const char* topicFilter); - - /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state - * @return success code - - */ - int disconnect(); - - /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive - * yield can be called if no other MQTT operation is needed. This will also allow messages to be - * received. - * @param timeout_ms the time to wait, in milliseconds - * @return success code - on failure, this means the client has disconnected - */ - int yield(unsigned long timeout_ms = 1000L); - - /** Is the client connected? - * @return flag - is the client connected or not? - */ - bool isConnected() - { - return isconnected; - } - -private: - - void cleanSession(); - int cycle(Timer& timer); - int waitfor(int packet_type, Timer& timer); - int keepalive(); - int publish(int len, Timer& timer, enum QoS qos); - - int decodePacket(int* value, int timeout); - int readPacket(Timer& timer); - int sendPacket(int length, Timer& timer); - int deliverMessage(MQTTString& topicName, Message& message); - bool isTopicMatched(char* topicFilter, MQTTString& topicName); - - Network& ipstack; - unsigned long command_timeout_ms; - - unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; - unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; - - Timer last_sent, last_received; - unsigned int keepAliveInterval; - bool ping_outstanding; - bool cleansession; - - PacketId packetid; - - struct MessageHandlers - { - const char* topicFilter; - FP_<void, MessageData&> fp; - } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic - - FP_<void, MessageData&> defaultMessageHandler; - - bool isconnected; - -#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 - unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect - int inflightLen; - unsigned short inflightMsgid; - enum QoS inflightQoS; -#endif - -#if MQTTCLIENT_QOS2 - bool pubrel; - #if !defined(MAX_INCOMING_QOS2_MESSAGES) - #define MAX_INCOMING_QOS2_MESSAGES 10 - #endif - unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; - bool isQoS2msgidFree(unsigned short id); - bool useQoS2msgid(unsigned short id); - void freeQoS2msgid(unsigned short id); -#endif - -}; - -} - - -template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> -void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() -{ - ping_outstanding = false; - for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - messageHandlers[i].topicFilter = 0; - isconnected = false; - -#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 - inflightMsgid = 0; - inflightQoS = QOS0; -#endif - -#if MQTTCLIENT_QOS2 - pubrel = false; - for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) - incomingQoS2messages[i] = 0; -#endif -} - - -template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> -MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() -{ - last_sent = Timer(); - last_received = Timer(); - this->command_timeout_ms = command_timeout_ms; - cleanSession(); -} - - -#if MQTTCLIENT_QOS2 -template<class Network, class Timer, int a, int b> -bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) -{ - for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) - { - if (incomingQoS2messages[i] == id) - return false; - } - return true; -} - - -template<class Network, class Timer, int a, int b> -bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) -{ - for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) - { - if (incomingQoS2messages[i] == 0) - { - incomingQoS2messages[i] = id; - return true; - } - } - return false; -} - - -template<class Network, class Timer, int a, int b> -void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) -{ - for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) - { - if (incomingQoS2messages[i] == id) - { - incomingQoS2messages[i] = 0; - return; - } - } -} -#endif - - -template<class Network, class Timer, int a, int b> -int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) -{ - int rc = FAILURE, - sent = 0; - - while (sent < length && !timer.expired()) - { - rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); - if (rc < 0) // there was an error writing the data - break; - sent += rc; - } - if (sent == length) - { - if (this->keepAliveInterval > 0) - last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet - rc = SUCCESS; - } - else - rc = FAILURE; - -#if defined(MQTT_DEBUG) - char printbuf[150]; - DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); -#endif - return rc; -} - - -template<class Network, class Timer, int a, int b> -int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) -{ - unsigned char c; - int multiplier = 1; - int len = 0; - const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; - - *value = 0; - do - { - int rc = MQTTPACKET_READ_ERROR; - - if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) - { - rc = MQTTPACKET_READ_ERROR; /* bad data */ - goto exit; - } - rc = ipstack.read(&c, 1, timeout); - if (rc != 1) - goto exit; - *value += (c & 127) * multiplier; - multiplier *= 128; - } while ((c & 128) != 0); -exit: - return len; -} - - -/** - * If any read fails in this method, then we should disconnect from the network, as on reconnect - * the packets can be retried. - * @param timeout the max time to wait for the packet read to complete, in milliseconds - * @return the MQTT packet type, or -1 if none - */ -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) -{ - int rc = FAILURE; - MQTTHeader header = {0}; - int len = 0; - int rem_len = 0; - - /* 1. read the header byte. This has the packet type in it */ - if (ipstack.read(readbuf, 1, timer.left_ms()) != 1) - goto exit; - - len = 1; - /* 2. read the remaining length. This is variable in itself */ - decodePacket(&rem_len, timer.left_ms()); - len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ - - if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) - { - rc = BUFFER_OVERFLOW; - goto exit; - } - - /* 3. read the rest of the buffer using a callback to supply the rest of the data */ - if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) - goto exit; - - header.byte = readbuf[0]; - rc = header.bits.type; - if (this->keepAliveInterval > 0) - last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet -exit: - -#if defined(MQTT_DEBUG) - if (rc >= 0) - { - char printbuf[50]; - DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); - } -#endif - return rc; -} - - -// assume topic filter and name is in correct format -// # can only be at end -// + and # can only be next to separator -template<class Network, class Timer, int a, int b> -bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) -{ - char* curf = topicFilter; - char* curn = topicName.lenstring.data; - char* curn_end = curn + topicName.lenstring.len; - - while (*curf && curn < curn_end) - { - if (*curn == '/' && *curf != '/') - break; - if (*curf != '+' && *curf != '#' && *curf != *curn) - break; - if (*curf == '+') - { // skip until we meet the next separator, or end of string - char* nextpos = curn + 1; - while (nextpos < curn_end && *nextpos != '/') - nextpos = ++curn + 1; - } - else if (*curf == '#') - curn = curn_end - 1; // skip until end of string - curf++; - curn++; - }; - - return (curn == curn_end) && (*curf == '\0'); -} - - - -template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> -int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) -{ - int rc = FAILURE; - - // we have to find the right message handler - indexed by topic - for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || - isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) - { - if (messageHandlers[i].fp.attached()) - { - MessageData md(topicName, message); - messageHandlers[i].fp(md); - rc = SUCCESS; - } - } - } - - if (rc == FAILURE && defaultMessageHandler.attached()) - { - MessageData md(topicName, message); - defaultMessageHandler(md); - rc = SUCCESS; - } - - return rc; -} - - - -template<class Network, class Timer, int a, int b> -int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) -{ - int rc = SUCCESS; - Timer timer = Timer(); - - timer.countdown_ms(timeout_ms); - while (!timer.expired()) - { - if (cycle(timer) < 0) - { - rc = FAILURE; - break; - } - Thread::wait(10); - } - - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) -{ - /* get one piece of work off the wire and one pass through */ - - // read the socket, see what work is due - int packet_type = readPacket(timer); - - int len = 0, - rc = SUCCESS; - - switch (packet_type) - { - case FAILURE: - case BUFFER_OVERFLOW: - rc = packet_type; - break; - case CONNACK: - case PUBACK: - case SUBACK: - break; - case PUBLISH: - { - MQTTString topicName = MQTTString_initializer; - Message msg; - int intQoS; - if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, - (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) - goto exit; - msg.qos = (enum QoS)intQoS; -#if MQTTCLIENT_QOS2 - if (msg.qos != QOS2) -#endif - deliverMessage(topicName, msg); -#if MQTTCLIENT_QOS2 - else if (isQoS2msgidFree(msg.id)) - { - if (useQoS2msgid(msg.id)) - deliverMessage(topicName, msg); - else - WARN("Maximum number of incoming QoS2 messages exceeded"); - } -#endif -#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 - if (msg.qos != QOS0) - { - if (msg.qos == QOS1) - len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); - else if (msg.qos == QOS2) - len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); - if (len <= 0) - rc = FAILURE; - else - rc = sendPacket(len, timer); - if (rc == FAILURE) - goto exit; // there was a problem - } - break; -#endif - } -#if MQTTCLIENT_QOS2 - case PUBREC: - case PUBREL: - unsigned short mypacketid; - unsigned char dup, type; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) - rc = FAILURE; - else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, - (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) - rc = FAILURE; - else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet - rc = FAILURE; // there was a problem - if (rc == FAILURE) - goto exit; // there was a problem - if (packet_type == PUBREL) - freeQoS2msgid(mypacketid); - break; - - case PUBCOMP: - break; -#endif - case PINGRESP: - ping_outstanding = false; - break; - } - keepalive(); -exit: - if (rc == SUCCESS) - rc = packet_type; - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() -{ - int rc = FAILURE; - - if (keepAliveInterval == 0) - { - rc = SUCCESS; - goto exit; - } - - if (last_sent.expired() || last_received.expired()) - { - if (!ping_outstanding) - { - Timer timer(1000); - int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); - if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet - ping_outstanding = true; - } - } - -exit: - return rc; -} - - -// only used in single-threaded mode where one command at a time is in process -template<class Network, class Timer, int a, int b> -int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) -{ - int rc = FAILURE; - - do - { - if (timer.expired()) - break; // we timed out - } - while ((rc = cycle(timer)) != packet_type); - - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) -{ - Timer connect_timer(command_timeout_ms); - int rc = FAILURE; - int len = 0; - - if (isconnected) // don't send connect packet again if we are already connected - goto exit; - - this->keepAliveInterval = options.keepAliveInterval; - this->cleansession = options.cleansession; - if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) - goto exit; - if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet - goto exit; // there was a problem - - if (this->keepAliveInterval > 0) - last_received.countdown(this->keepAliveInterval); - // this will be a blocking call, wait for the connack - if (waitfor(CONNACK, connect_timer) == CONNACK) - { - unsigned char connack_rc = 255; - bool sessionPresent = false; - if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = connack_rc; - else - rc = FAILURE; - } - else - rc = FAILURE; - -#if MQTTCLIENT_QOS2 - // resend any inflight publish - if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) - { - if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) - rc = FAILURE; - else - rc = publish(len, connect_timer, inflightQoS); - } - else -#endif -#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 - if (inflightMsgid > 0) - { - memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); - rc = publish(inflightLen, connect_timer, inflightQoS); - } -#endif - -exit: - if (rc == SUCCESS) - isconnected = true; - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() -{ - MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; - return connect(default_options); -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) -{ - int rc = FAILURE; - Timer timer(command_timeout_ms); - int len = 0; - MQTTString topic = {(char*)topicFilter, {0, 0}}; - - if (!isconnected) - goto exit; - - len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); - if (len <= 0) - goto exit; - if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet - goto exit; // there was a problem - - if (waitfor(SUBACK, timer) == SUBACK) // wait for suback - { - int count = 0, grantedQoS = -1; - unsigned short mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = grantedQoS; // 0, 1, 2 or 0x80 - if (rc != 0x80) - { - for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topicFilter == 0) - { - messageHandlers[i].topicFilter = topicFilter; - messageHandlers[i].fp.attach(messageHandler); - rc = 0; - break; - } - } - } - } - else - rc = FAILURE; - -exit: - if (rc != SUCCESS) - cleanSession(); - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) -{ - int rc = FAILURE; - Timer timer(command_timeout_ms); - MQTTString topic = {(char*)topicFilter, {0, 0}}; - int len = 0; - - if (!isconnected) - goto exit; - - if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) - goto exit; - if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet - goto exit; // there was a problem - - if (waitfor(UNSUBACK, timer) == UNSUBACK) - { - unsigned short mypacketid; // should be the same as the packetid above - if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - { - rc = 0; - - // remove the subscription message handler associated with this topic, if there is one - for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) - { - messageHandlers[i].topicFilter = 0; - break; - } - } - } - } - else - rc = FAILURE; - -exit: - if (rc != SUCCESS) - cleanSession(); - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) -{ - int rc; - - if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet - goto exit; // there was a problem - -#if MQTTCLIENT_QOS1 - if (qos == QOS1) - { - if (waitfor(PUBACK, timer) == PUBACK) - { - unsigned short mypacketid; - unsigned char dup, type; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) - rc = FAILURE; - else if (inflightMsgid == mypacketid) - inflightMsgid = 0; - } - else - rc = FAILURE; - } -#elif MQTTCLIENT_QOS2 - else if (qos == QOS2) - { - if (waitfor(PUBCOMP, timer) == PUBCOMP) - { - unsigned short mypacketid; - unsigned char dup, type; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) - rc = FAILURE; - else if (inflightMsgid == mypacketid) - inflightMsgid = 0; - } - else - rc = FAILURE; - } -#endif - -exit: - if (rc != SUCCESS) - cleanSession(); - return rc; -} - - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) -{ - int rc = FAILURE; - Timer timer(command_timeout_ms); - MQTTString topicString = MQTTString_initializer; - int len = 0; - - if (!isconnected) - goto exit; - - topicString.cstring = (char*)topicName; - -#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 - if (qos == QOS1 || qos == QOS2) - id = packetid.getNext(); -#endif - - len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, - topicString, (unsigned char*)payload, payloadlen); - if (len <= 0) - goto exit; - -#if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 - if (!cleansession) - { - memcpy(pubbuf, sendbuf, len); - inflightMsgid = id; - inflightLen = len; - inflightQoS = qos; -#if MQTTCLIENT_QOS2 - pubrel = false; -#endif - } -#endif - - rc = publish(len, timer, qos); -exit: - return rc; -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) -{ - unsigned short id = 0; // dummy - not used for anything - return publish(topicName, payload, payloadlen, id, qos, retained); -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) -{ - return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); -} - - -template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> -int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() -{ - int rc = FAILURE; - Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete - int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); - if (len > 0) - rc = sendPacket(len, timer); // send the disconnect packet - - if (cleansession) - cleanSession(); - else - isconnected = false; - return rc; -} - - -#endif \ No newline at end of file
--- a/MQTT/MQTTEthernet.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,35 +0,0 @@ - -#if !defined(MQTTETHERNET_H) -#define MQTTETHERNET_H - -#include "MQTTmbed.h" -#include "EthernetInterface.h" -#include "MQTTSocket.h" - -class MQTTEthernet : public MQTTSocket -{ -public: - MQTTEthernet() - { - eth.init(); // Use DHCP - eth.connect(); - } - - EthernetInterface& getEth() - { - return eth; - } - - void reconnect() - { - eth.connect(); // nothing I've tried actually works to reconnect - } - -private: - - EthernetInterface eth; - -}; - - -#endif
--- a/MQTT/MQTTLogging.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,39 +0,0 @@ -#if !defined(MQTT_LOGGING_H) -#define MQTT_LOGGING_H - -#define STREAM stdout -#if !defined(DEBUG) -#define DEBUG(...) \ - {\ - fprintf(STREAM, "DEBUG: %s L#%d ", __PRETTY_FUNCTION__, __LINE__); \ - fprintf(STREAM, ##__VA_ARGS__); \ - fflush(STREAM); \ - } -#endif -#if !defined(LOG) -#define LOG(...) \ - {\ - fprintf(STREAM, "LOG: %s L#%d ", __PRETTY_FUNCTION__, __LINE__); \ - fprintf(STREAM, ##__VA_ARGS__); \ - fflush(STREAM); \ - } -#endif -#if !defined(WARN) -#define WARN(...) \ - { \ - fprintf(STREAM, "WARN: %s L#%d ", __PRETTY_FUNCTION__, __LINE__); \ - fprintf(STREAM, ##__VA_ARGS__); \ - fflush(STREAM); \ - } -#endif -#if !defined(ERROR) -#define ERROR(...) \ - { \ - fprintf(STREAM, "ERROR: %s L#%d ", __PRETTY_FUNCTION__, __LINE__); \ - fprintf(STREAM, ##__VA_ARGS__); \ - fflush(STREAM); \ - exit(1); \ - } -#endif - -#endif \ No newline at end of file
--- a/MQTT/MQTTPacket/MQTTConnect.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,139 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014, 2015 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - * Ian Craggs - add connack return code definitions - *******************************************************************************/ - -#ifndef MQTTCONNECT_H_ -#define MQTTCONNECT_H_ - -enum connack_return_codes -{ - MQTT_CONNECTION_ACCEPTED = 0, - MQTT_UNNACCEPTABLE_PROTOCOL = 1, - MQTT_CLIENTID_REJECTED = 2, - MQTT_SERVER_UNAVAILABLE = 3, - MQTT_BAD_USERNAME_OR_PASSWORD = 4, - MQTT_NOT_AUTHORIZED = 5, -}; - - -typedef union -{ - unsigned char all; /**< all connect flags */ -#if defined(REVERSED) - struct - { - unsigned int username : 1; /**< 3.1 user name */ - unsigned int password : 1; /**< 3.1 password */ - unsigned int willRetain : 1; /**< will retain setting */ - unsigned int willQoS : 2; /**< will QoS value */ - unsigned int will : 1; /**< will flag */ - unsigned int cleansession : 1; /**< clean session flag */ - unsigned int : 1; /**< unused */ - } bits; -#else - struct - { - unsigned int : 1; /**< unused */ - unsigned int cleansession : 1; /**< cleansession flag */ - unsigned int will : 1; /**< will flag */ - unsigned int willQoS : 2; /**< will QoS value */ - unsigned int willRetain : 1; /**< will retain setting */ - unsigned int password : 1; /**< 3.1 password */ - unsigned int username : 1; /**< 3.1 user name */ - } bits; -#endif -} MQTTConnectFlags; /**< connect flags byte */ - - - -/** - * Defines the MQTT "Last Will and Testament" (LWT) settings for - * the connect packet. - */ -typedef struct -{ - /** The eyecatcher for this structure. must be MQTW. */ - char struct_id[4]; - /** The version number of this structure. Must be 0 */ - int struct_version; - /** The LWT topic to which the LWT message will be published. */ - MQTTString topicName; - /** The LWT payload. */ - MQTTString message; - /** - * The retained flag for the LWT message (see MQTTAsync_message.retained). - */ - unsigned char retained; - /** - * The quality of service setting for the LWT message (see - * MQTTAsync_message.qos and @ref qos). - */ - char qos; -} MQTTPacket_willOptions; - - -#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 } - - -typedef struct -{ - /** The eyecatcher for this structure. must be MQTC. */ - char struct_id[4]; - /** The version number of this structure. Must be 0 */ - int struct_version; - /** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 - */ - unsigned char MQTTVersion; - MQTTString clientID; - unsigned short keepAliveInterval; - unsigned char cleansession; - unsigned char willFlag; - MQTTPacket_willOptions will; - MQTTString username; - MQTTString password; -} MQTTPacket_connectData; - -typedef union -{ - unsigned char all; /**< all connack flags */ -#if defined(REVERSED) - struct - { - unsigned int sessionpresent : 1; /**< session present flag */ - unsigned int : y; /**< unused */ - } bits; -#else - struct - { - unsigned int : 7; /**< unused */ - unsigned int sessionpresent : 1; /**< session present flag */ - } bits; -#endif -} MQTTConnackFlags; /**< connack flags byte */ - -#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \ - MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} } - -int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options); -int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int len); - -int MQTTSerialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent); -int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen); - -int MQTTSerialize_disconnect(unsigned char* buf, int buflen); -int MQTTSerialize_pingreq(unsigned char* buf, int buflen); - -#endif /* MQTTCONNECT_H_ */
--- a/MQTT/MQTTPacket/MQTTConnectClient.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,215 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "MQTTPacket.h" -#include "StackTrace.h" - -#include <string.h> - -/** - * Determines the length of the MQTT connect packet that would be produced using the supplied connect options. - * @param options the options to be used to build the connect packet - * @return the length of buffer needed to contain the serialized version of the packet - */ -int MQTTSerialize_connectLength(MQTTPacket_connectData* options) -{ - int len = 0; - - FUNC_ENTRY; - - if (options->MQTTVersion == 3) - len = 12; /* variable depending on MQTT or MQIsdp */ - else if (options->MQTTVersion == 4) - len = 10; - - len += MQTTstrlen(options->clientID)+2; - if (options->willFlag) - len += MQTTstrlen(options->will.topicName)+2 + MQTTstrlen(options->will.message)+2; - if (options->username.cstring || options->username.lenstring.data) - len += MQTTstrlen(options->username)+2; - if (options->password.cstring || options->password.lenstring.data) - len += MQTTstrlen(options->password)+2; - - FUNC_EXIT_RC(len); - return len; -} - - -/** - * Serializes the connect options into the buffer. - * @param buf the buffer into which the packet will be serialized - * @param len the length in bytes of the supplied buffer - * @param options the options to be used to build the connect packet - * @return serialized length, or error if 0 - */ -int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options) -{ - unsigned char *ptr = buf; - MQTTHeader header = {0}; - MQTTConnectFlags flags = {0}; - int len = 0; - int rc = -1; - - FUNC_ENTRY; - if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - - header.byte = 0; - header.bits.type = CONNECT; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, len); /* write remaining length */ - - if (options->MQTTVersion == 4) - { - writeCString(&ptr, "MQTT"); - writeChar(&ptr, (char) 4); - } - else - { - writeCString(&ptr, "MQIsdp"); - writeChar(&ptr, (char) 3); - } - - flags.all = 0; - flags.bits.cleansession = options->cleansession; - flags.bits.will = (options->willFlag) ? 1 : 0; - if (flags.bits.will) - { - flags.bits.willQoS = options->will.qos; - flags.bits.willRetain = options->will.retained; - } - - if (options->username.cstring || options->username.lenstring.data) - flags.bits.username = 1; - if (options->password.cstring || options->password.lenstring.data) - flags.bits.password = 1; - - writeChar(&ptr, flags.all); - writeInt(&ptr, options->keepAliveInterval); - writeMQTTString(&ptr, options->clientID); - if (options->willFlag) - { - writeMQTTString(&ptr, options->will.topicName); - writeMQTTString(&ptr, options->will.message); - } - if (flags.bits.username) - writeMQTTString(&ptr, options->username); - if (flags.bits.password) - writeMQTTString(&ptr, options->password); - - rc = ptr - buf; - - exit: FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Deserializes the supplied (wire) buffer into connack data - return code - * @param sessionPresent the session present flag returned (only for MQTT 3.1.1) - * @param connack_rc returned integer value of the connack return code - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param len the length in bytes of the data in the supplied buffer - * @return error code. 1 is success, 0 is failure - */ -int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen) -{ - MQTTHeader header = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = NULL; - int rc = 0; - int mylen; - MQTTConnackFlags flags = {0}; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - if (header.bits.type != CONNACK) - goto exit; - - curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ - enddata = curdata + mylen; - if (enddata - curdata < 2) - goto exit; - - flags.all = readChar(&curdata); - *sessionPresent = flags.bits.sessionpresent; - *connack_rc = readChar(&curdata); - - rc = 1; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - - -/** - * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer, to avoid overruns - * @param packettype the message type - * @return serialized length, or error if 0 - */ -int MQTTSerialize_zero(unsigned char* buf, int buflen, unsigned char packettype) -{ - MQTTHeader header = {0}; - int rc = -1; - unsigned char *ptr = buf; - - FUNC_ENTRY; - if (buflen < 2) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - header.byte = 0; - header.bits.type = packettype; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, 0); /* write remaining length */ - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Serializes a disconnect packet into the supplied buffer, ready for writing to a socket - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer, to avoid overruns - * @return serialized length, or error if 0 - */ -int MQTTSerialize_disconnect(unsigned char* buf, int buflen) -{ - return MQTTSerialize_zero(buf, buflen, DISCONNECT); -} - - -/** - * Serializes a disconnect packet into the supplied buffer, ready for writing to a socket - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer, to avoid overruns - * @return serialized length, or error if 0 - */ -int MQTTSerialize_pingreq(unsigned char* buf, int buflen) -{ - return MQTTSerialize_zero(buf, buflen, PINGREQ); -}
--- a/MQTT/MQTTPacket/MQTTConnectServer.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,148 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "StackTrace.h" -#include "MQTTPacket.h" -#include <string.h> - -#define min(a, b) ((a < b) ? a : b) - - -/** - * Validates MQTT protocol name and version combinations - * @param protocol the MQTT protocol name as an MQTTString - * @param version the MQTT protocol version number, as in the connect packet - * @return correct MQTT combination? 1 is true, 0 is false - */ -int MQTTPacket_checkVersion(MQTTString* protocol, int version) -{ - int rc = 0; - - if (version == 3 && memcmp(protocol->lenstring.data, "MQIdsp", - min(6, protocol->lenstring.len)) == 0) - rc = 1; - else if (version == 4 && memcmp(protocol->lenstring.data, "MQTT", - min(4, protocol->lenstring.len)) == 0) - rc = 1; - return rc; -} - - -/** - * Deserializes the supplied (wire) buffer into connect data structure - * @param data the connect data structure to be filled out - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param len the length in bytes of the data in the supplied buffer - * @return error code. 1 is success, 0 is failure - */ -int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int len) -{ - MQTTHeader header = {0}; - MQTTConnectFlags flags = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = &buf[len]; - int rc = 0; - MQTTString Protocol; - int version; - int mylen = 0; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - if (header.bits.type != CONNECT) - goto exit; - - curdata += MQTTPacket_decodeBuf(curdata, &mylen); /* read remaining length */ - - if (!readMQTTLenString(&Protocol, &curdata, enddata) || - enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */ - goto exit; - - version = (int)readChar(&curdata); /* Protocol version */ - /* If we don't recognize the protocol version, we don't parse the connect packet on the - * basis that we don't know what the format will be. - */ - if (MQTTPacket_checkVersion(&Protocol, version)) - { - flags.all = readChar(&curdata); - data->cleansession = flags.bits.cleansession; - data->keepAliveInterval = readInt(&curdata); - if (!readMQTTLenString(&data->clientID, &curdata, enddata)) - goto exit; - if (flags.bits.will) - { - data->willFlag = 1; - data->will.qos = flags.bits.willQoS; - data->will.retained = flags.bits.willRetain; - if (!readMQTTLenString(&data->will.topicName, &curdata, enddata) || - !readMQTTLenString(&data->will.message, &curdata, enddata)) - goto exit; - } - if (flags.bits.username) - { - if (enddata - curdata < 3 || !readMQTTLenString(&data->username, &curdata, enddata)) - goto exit; /* username flag set, but no username supplied - invalid */ - if (flags.bits.password && - (enddata - curdata < 3 || !readMQTTLenString(&data->password, &curdata, enddata))) - goto exit; /* password flag set, but no password supplied - invalid */ - } - else if (flags.bits.password) - goto exit; /* password flag set without username - invalid */ - rc = 1; - } -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Serializes the connack packet into the supplied buffer. - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param connack_rc the integer connack return code to be used - * @param sessionPresent the MQTT 3.1.1 sessionPresent flag - * @return serialized length, or error if 0 - */ -int MQTTSerialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent) -{ - MQTTHeader header = {0}; - int rc = 0; - unsigned char *ptr = buf; - MQTTConnackFlags flags = {0}; - - FUNC_ENTRY; - if (buflen < 2) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - header.byte = 0; - header.bits.type = CONNACK; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ - - flags.all = 0; - flags.bits.sessionpresent = sessionPresent; - writeChar(&ptr, flags.all); - writeChar(&ptr, connack_rc); - - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} -
--- a/MQTT/MQTTPacket/MQTTDeserializePublish.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,107 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "StackTrace.h" -#include "MQTTPacket.h" -#include <string.h> - -#define min(a, b) ((a < b) ? 1 : 0) - -/** - * Deserializes the supplied (wire) buffer into publish data - * @param dup returned integer - the MQTT dup flag - * @param qos returned integer - the MQTT QoS value - * @param retained returned integer - the MQTT retained flag - * @param packetid returned integer - the MQTT packet identifier - * @param topicName returned MQTTString - the MQTT topic in the publish - * @param payload returned byte buffer - the MQTT publish payload - * @param payloadlen returned integer - the length of the MQTT payload - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @return error code. 1 is success - */ -int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, - unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen) -{ - MQTTHeader header = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = NULL; - int rc = 0; - int mylen = 0; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - if (header.bits.type != PUBLISH) - goto exit; - *dup = header.bits.dup; - *qos = header.bits.qos; - *retained = header.bits.retain; - - curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ - enddata = curdata + mylen; - - if (!readMQTTLenString(topicName, &curdata, enddata) || - enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */ - goto exit; - - if (*qos > 0) - *packetid = readInt(&curdata); - - *payloadlen = enddata - curdata; - *payload = curdata; - rc = 1; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - - -/** - * Deserializes the supplied (wire) buffer into an ack - * @param packettype returned integer - the MQTT packet type - * @param dup returned integer - the MQTT dup flag - * @param packetid returned integer - the MQTT packet identifier - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @return error code. 1 is success, 0 is failure - */ -int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen) -{ - MQTTHeader header = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = NULL; - int rc = 0; - int mylen; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - *dup = header.bits.dup; - *packettype = header.bits.type; - - curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ - enddata = curdata + mylen; - - if (enddata - curdata < 2) - goto exit; - *packetid = readInt(&curdata); - - rc = 1; -exit: - FUNC_EXIT_RC(rc); - return rc; -} -
--- a/MQTT/MQTTPacket/MQTTPacket.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,453 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "StackTrace.h" -#include "MQTTPacket.h" - -#include <string.h> - -/** - * Encodes the message length according to the MQTT algorithm - * @param buf the buffer into which the encoded data is written - * @param length the length to be encoded - * @return the number of bytes written to buffer - */ -int MQTTPacket_encode(unsigned char* buf, int length) -{ - int rc = 0; - - FUNC_ENTRY; - do - { - char d = length % 128; - length /= 128; - /* if there are more digits to encode, set the top bit of this digit */ - if (length > 0) - d |= 0x80; - buf[rc++] = d; - } while (length > 0); - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Decodes the message length according to the MQTT algorithm - * @param getcharfn pointer to function to read the next character from the data source - * @param value the decoded length returned - * @return the number of bytes read from the socket - */ -int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value) -{ - unsigned char c; - int multiplier = 1; - int len = 0; -#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 - - FUNC_ENTRY; - *value = 0; - do - { - int rc = MQTTPACKET_READ_ERROR; - - if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) - { - rc = MQTTPACKET_READ_ERROR; /* bad data */ - goto exit; - } - rc = (*getcharfn)(&c, 1); - if (rc != 1) - goto exit; - *value += (c & 127) * multiplier; - multiplier *= 128; - } while ((c & 128) != 0); -exit: - FUNC_EXIT_RC(len); - return len; -} - - -int MQTTPacket_len(int rem_len) -{ - rem_len += 1; /* header byte */ - - /* now remaining_length field */ - if (rem_len < 128) - rem_len += 1; - else if (rem_len < 16384) - rem_len += 2; - else if (rem_len < 2097151) - rem_len += 3; - else - rem_len += 4; - return rem_len; -} - - -static unsigned char* bufptr; - -int bufchar(unsigned char* c, int count) -{ - int i; - - for (i = 0; i < count; ++i) - *c = *bufptr++; - return count; -} - - -int MQTTPacket_decodeBuf(unsigned char* buf, int* value) -{ - bufptr = buf; - return MQTTPacket_decode(bufchar, value); -} - - -/** - * Calculates an integer from two bytes read from the input buffer - * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned - * @return the integer value calculated - */ -int readInt(unsigned char** pptr) -{ - unsigned char* ptr = *pptr; - int len = 256*(*ptr) + (*(ptr+1)); - *pptr += 2; - return len; -} - - -/** - * Reads one character from the input buffer. - * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned - * @return the character read - */ -char readChar(unsigned char** pptr) -{ - char c = **pptr; - (*pptr)++; - return c; -} - - -/** - * Writes one character to an output buffer. - * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned - * @param c the character to write - */ -void writeChar(unsigned char** pptr, char c) -{ - **pptr = c; - (*pptr)++; -} - - -/** - * Writes an integer as 2 bytes to an output buffer. - * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned - * @param anInt the integer to write - */ -void writeInt(unsigned char** pptr, int anInt) -{ - **pptr = (unsigned char)(anInt / 256); - (*pptr)++; - **pptr = (unsigned char)(anInt % 256); - (*pptr)++; -} - - -/** - * Writes a "UTF" string to an output buffer. Converts C string to length-delimited. - * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned - * @param string the C string to write - */ -void writeCString(unsigned char** pptr, const char* string) -{ - int len = strlen(string); - writeInt(pptr, len); - memcpy(*pptr, string, len); - *pptr += len; -} - - -int getLenStringLen(char* ptr) -{ - int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1)); - return len; -} - - -void writeMQTTString(unsigned char** pptr, MQTTString mqttstring) -{ - if (mqttstring.lenstring.len > 0) - { - writeInt(pptr, mqttstring.lenstring.len); - memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len); - *pptr += mqttstring.lenstring.len; - } - else if (mqttstring.cstring) - writeCString(pptr, mqttstring.cstring); - else - writeInt(pptr, 0); -} - - -/** - * @param mqttstring the MQTTString structure into which the data is to be read - * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned - * @param enddata pointer to the end of the data: do not read beyond - * @return 1 if successful, 0 if not - */ -int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata) -{ - int rc = 0; - - FUNC_ENTRY; - /* the first two bytes are the length of the string */ - if (enddata - (*pptr) > 1) /* enough length to read the integer? */ - { - mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */ - if (&(*pptr)[mqttstring->lenstring.len] <= enddata) - { - mqttstring->lenstring.data = (char*)*pptr; - *pptr += mqttstring->lenstring.len; - rc = 1; - } - } - mqttstring->cstring = NULL; - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Return the length of the MQTTstring - C string if there is one, otherwise the length delimited string - * @param mqttstring the string to return the length of - * @return the length of the string - */ -int MQTTstrlen(MQTTString mqttstring) -{ - int rc = 0; - - if (mqttstring.cstring) - rc = strlen(mqttstring.cstring); - else - rc = mqttstring.lenstring.len; - return rc; -} - - -/** - * Compares an MQTTString to a C string - * @param a the MQTTString to compare - * @param bptr the C string to compare - * @return boolean - equal or not - */ -int MQTTPacket_equals(MQTTString* a, char* bptr) -{ - int alen = 0, - blen = 0; - char *aptr; - - if (a->cstring) - { - aptr = a->cstring; - alen = strlen(a->cstring); - } - else - { - aptr = a->lenstring.data; - alen = a->lenstring.len; - } - blen = strlen(bptr); - - return (alen == blen) && (strncmp(aptr, bptr, alen) == 0); -} - - -/** - * Helper function to read packet data from some source into a buffer - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param getfn pointer to a function which will read any number of bytes from the needed source - * @return integer MQTT packet type, or -1 on error - */ -int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)) -{ - int rc = -1; - MQTTHeader header = {0}; - int len = 0; - int rem_len = 0; - - /* 1. read the header byte. This has the packet type in it */ - if ((*getfn)(buf, 1) != 1) - goto exit; - - len = 1; - /* 2. read the remaining length. This is variable in itself */ - MQTTPacket_decode(getfn, &rem_len); - len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */ - - /* 3. read the rest of the buffer using a callback to supply the rest of the data */ - if ((*getfn)(buf + len, rem_len) != rem_len) - goto exit; - - header.byte = buf[0]; - rc = header.bits.type; -exit: - return rc; -} - - -const char* MQTTPacket_names[] = -{ - "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", - "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", - "PINGREQ", "PINGRESP", "DISCONNECT" -}; - - -char* MQTTPacket_toString(char* strbuf, int strbuflen, unsigned char* buf, int buflen) -{ - int index = 0; - int rem_length = 0; - MQTTHeader header = {0}; - int strindex = 0; - - header.byte = buf[index++]; - index += MQTTPacket_decodeBuf(&buf[index], &rem_length); - - switch (header.bits.type) - { - case CONNECT: - { - MQTTPacket_connectData data; - if (MQTTDeserialize_connect(&data, buf, buflen) == 1) - { - strindex = snprintf(strbuf, strbuflen, - "CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %hd", - (int)data.MQTTVersion, data.clientID.lenstring.len, data.clientID.lenstring.data, - (int)data.cleansession, data.keepAliveInterval); - if (data.willFlag) - strindex += snprintf(&strbuf[strindex], strbuflen - strindex, - ", will QoS %d, will retain %d, will topic %.*s, will message %.*s", - data.will.qos, data.will.retained, - data.will.topicName.lenstring.len, data.will.topicName.lenstring.data, - data.will.message.lenstring.len, data.will.message.lenstring.data); - if (data.username.lenstring.data && data.username.lenstring.len > 0) - { - printf("user name\n"); - strindex += snprintf(&strbuf[strindex], strbuflen - strindex, - ", user name %.*s", data.username.lenstring.len, data.username.lenstring.data); - } - if (data.password.lenstring.data && data.password.lenstring.len > 0) - strindex += snprintf(&strbuf[strindex], strbuflen - strindex, - ", password %.*s", data.password.lenstring.len, data.password.lenstring.data); - } - } - break; - case CONNACK: - { - unsigned char sessionPresent, connack_rc; - if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "CONNACK session present %d, rc %d", sessionPresent, connack_rc); - } - break; - case PUBLISH: - { - unsigned char dup, retained, *payload; - unsigned short packetid; - int qos, payloadlen; - MQTTString topicName = MQTTString_initializer; - if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, - &payload, &payloadlen, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s", - dup, qos, retained, packetid, - (topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data, - payloadlen, (payloadlen < 20) ? payloadlen : 20, payload); - } - break; - case PUBACK: - case PUBREC: - case PUBREL: - case PUBCOMP: - { - unsigned char packettype, dup; - unsigned short packetid; - if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "%s dup %d, packet id %d", - MQTTPacket_names[packettype], dup, packetid); - } - break; - case SUBSCRIBE: - { - unsigned char dup; - unsigned short packetid; - int maxcount = 1, count = 0; - MQTTString topicFilters[1]; - int requestedQoSs[1]; - if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count, - topicFilters, requestedQoSs, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d", - dup, packetid, count, - topicFilters[0].lenstring.len, topicFilters[0].lenstring.data, - requestedQoSs[0]); - } - break; - case SUBACK: - { - unsigned short packetid; - int maxcount = 1, count = 0; - int grantedQoSs[1]; - if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "SUBACK packet id %d count %d granted qos %d", - packetid, count, grantedQoSs[0]); - } - break; - case UNSUBSCRIBE: - { - unsigned char dup; - unsigned short packetid; - int maxcount = 1, count = 0; - MQTTString topicFilters[1]; - if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s", - dup, packetid, count, - topicFilters[0].lenstring.len, topicFilters[0].lenstring.data); - } - break; - case UNSUBACK: - { - unsigned short packetid; - if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1) - strindex = snprintf(strbuf, strbuflen, - "UNSUBACK packet id %d", packetid); - } - break; - case PINGREQ: - case PINGRESP: - case DISCONNECT: - strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]); - break; - } - return strbuf; -}
--- a/MQTT/MQTTPacket/MQTTPacket.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,111 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#ifndef MQTTPACKET_H_ -#define MQTTPACKET_H_ - -#if defined(__cplusplus) /* If this is a C++ compiler, use C linkage */ -extern "C" { -#endif - -enum errors -{ - MQTTPACKET_BUFFER_TOO_SHORT = -2, - MQTTPACKET_READ_ERROR = -1, - MQTTPACKET_READ_COMPLETE, -}; - -enum msgTypes -{ - CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, - PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, - PINGREQ, PINGRESP, DISCONNECT -}; - -/** - * Bitfields for the MQTT header byte. - */ -typedef union -{ - unsigned char byte; /**< the whole byte */ -#if defined(REVERSED) - struct - { - unsigned int type : 4; /**< message type nibble */ - unsigned int dup : 1; /**< DUP flag bit */ - unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */ - unsigned int retain : 1; /**< retained flag bit */ - } bits; -#else - struct - { - unsigned int retain : 1; /**< retained flag bit */ - unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */ - unsigned int dup : 1; /**< DUP flag bit */ - unsigned int type : 4; /**< message type nibble */ - } bits; -#endif -} MQTTHeader; - -typedef struct -{ - int len; - char* data; -} MQTTLenString; - -typedef struct -{ - char* cstring; - MQTTLenString lenstring; -} MQTTString; - -#define MQTTString_initializer {NULL, {0, NULL}} - -int MQTTstrlen(MQTTString mqttstring); - -#include "MQTTConnect.h" -#include "MQTTPublish.h" -#include "MQTTSubscribe.h" -#include "MQTTUnsubscribe.h" - -int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid); -int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen); - -int MQTTPacket_len(int rem_len); -int MQTTPacket_equals(MQTTString* a, char* b); - -int MQTTPacket_encode(unsigned char* buf, int length); -int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value); -int MQTTPacket_decodeBuf(unsigned char* buf, int* value); - -int readInt(unsigned char** pptr); -char readChar(unsigned char** pptr); -void writeChar(unsigned char** pptr, char c); -void writeInt(unsigned char** pptr, int anInt); -int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata); -void writeCString(unsigned char** pptr, const char* string); -void writeMQTTString(unsigned char** pptr, MQTTString mqttstring); - -int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)); - -char* MQTTPacket_toString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); - -#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */ -} -#endif - - -#endif /* MQTTPACKET_H_ */
--- a/MQTT/MQTTPacket/MQTTPublish.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#ifndef MQTTPUBLISH_H_ -#define MQTTPUBLISH_H_ - -int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid, - MQTTString topicName, unsigned char* payload, int payloadlen); - -int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, - unsigned char** payload, int* payloadlen, unsigned char* buf, int len); - -int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid); -int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid); -int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid); - -#endif /* MQTTPUBLISH_H_ */
--- a/MQTT/MQTTPacket/MQTTSerializePublish.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,168 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "MQTTPacket.h" -#include "StackTrace.h" - -#include <string.h> - - -/** - * Determines the length of the MQTT publish packet that would be produced using the supplied parameters - * @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0) - * @param topicName the topic name to be used in the publish - * @param payloadlen the length of the payload to be sent - * @return the length of buffer needed to contain the serialized version of the packet - */ -int MQTTSerialize_publishLength(int qos, MQTTString topicName, int payloadlen) -{ - int len = 0; - - len += 2 + MQTTstrlen(topicName) + payloadlen; - if (qos > 0) - len += 2; /* packetid */ - return len; -} - - -/** - * Serializes the supplied publish data into the supplied buffer, ready for sending - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param dup integer - the MQTT dup flag - * @param qos integer - the MQTT QoS value - * @param retained integer - the MQTT retained flag - * @param packetid integer - the MQTT packet identifier - * @param topicName MQTTString - the MQTT topic in the publish - * @param payload byte buffer - the MQTT publish payload - * @param payloadlen integer - the length of the MQTT payload - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid, - MQTTString topicName, unsigned char* payload, int payloadlen) -{ - unsigned char *ptr = buf; - MQTTHeader header = {0}; - int rem_len = 0; - int rc = 0; - - FUNC_ENTRY; - if (MQTTPacket_len(rem_len = MQTTSerialize_publishLength(qos, topicName, payloadlen)) > buflen) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - - header.bits.type = PUBLISH; - header.bits.dup = dup; - header.bits.qos = qos; - header.bits.retain = retained; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; - - writeMQTTString(&ptr, topicName); - - if (qos > 0) - writeInt(&ptr, packetid); - - memcpy(ptr, payload, payloadlen); - ptr += payloadlen; - - rc = ptr - buf; - -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - - -/** - * Serializes the ack packet into the supplied buffer. - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param type the MQTT packet type - * @param dup the MQTT dup flag - * @param packetid the MQTT packet identifier - * @return serialized length, or error if 0 - */ -int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char packettype, unsigned char dup, unsigned short packetid) -{ - MQTTHeader header = {0}; - int rc = 0; - unsigned char *ptr = buf; - - FUNC_ENTRY; - if (buflen < 4) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - header.bits.type = packettype; - header.bits.dup = dup; - header.bits.qos = 0; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ - writeInt(&ptr, packetid); - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Serializes a puback packet into the supplied buffer. - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param packetid integer - the MQTT packet identifier - * @return serialized length, or error if 0 - */ -int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid) -{ - return MQTTSerialize_ack(buf, buflen, PUBACK, packetid, 0); -} - - -/** - * Serializes a pubrel packet into the supplied buffer. - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param dup integer - the MQTT dup flag - * @param packetid integer - the MQTT packet identifier - * @return serialized length, or error if 0 - */ -int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid) -{ - return MQTTSerialize_ack(buf, buflen, PUBREL, packetid, dup); -} - - -/** - * Serializes a pubrel packet into the supplied buffer. - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param packetid integer - the MQTT packet identifier - * @return serialized length, or error if 0 - */ -int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid) -{ - return MQTTSerialize_ack(buf, buflen, PUBCOMP, packetid, 0); -} - -
--- a/MQTT/MQTTPacket/MQTTSubscribe.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#ifndef MQTTSUBSCRIBE_H_ -#define MQTTSUBSCRIBE_H_ - -int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, - int count, MQTTString topicFilters[], int requestedQoSs[]); - -int MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid, - int maxcount, int* count, MQTTString topicFilters[], int requestedQoSs[], unsigned char* buf, int len); - -int MQTTSerialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs); - -int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int len); - - -#endif /* MQTTSUBSCRIBE_H_ */
--- a/MQTT/MQTTPacket/MQTTSubscribeClient.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,137 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "MQTTPacket.h" -#include "StackTrace.h" - -#include <string.h> - -/** - * Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters - * @param count the number of topic filter strings in topicFilters - * @param topicFilters the array of topic filter strings to be used in the publish - * @return the length of buffer needed to contain the serialized version of the packet - */ -int MQTTSerialize_subscribeLength(int count, MQTTString topicFilters[]) -{ - int i; - int len = 2; /* packetid */ - - for (i = 0; i < count; ++i) - len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */ - return len; -} - - -/** - * Serializes the supplied subscribe data into the supplied buffer, ready for sending - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied bufferr - * @param dup integer - the MQTT dup flag - * @param packetid integer - the MQTT packet identifier - * @param count - number of members in the topicFilters and reqQos arrays - * @param topicFilters - array of topic filter names - * @param requestedQoSs - array of requested QoS - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count, - MQTTString topicFilters[], int requestedQoSs[]) -{ - unsigned char *ptr = buf; - MQTTHeader header = {0}; - int rem_len = 0; - int rc = 0; - int i = 0; - - FUNC_ENTRY; - if (MQTTPacket_len(rem_len = MQTTSerialize_subscribeLength(count, topicFilters)) > buflen) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - - header.byte = 0; - header.bits.type = SUBSCRIBE; - header.bits.dup = dup; - header.bits.qos = 1; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; - - writeInt(&ptr, packetid); - - for (i = 0; i < count; ++i) - { - writeMQTTString(&ptr, topicFilters[i]); - writeChar(&ptr, requestedQoSs[i]); - } - - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - - -/** - * Deserializes the supplied (wire) buffer into suback data - * @param packetid returned integer - the MQTT packet identifier - * @param maxcount - the maximum number of members allowed in the grantedQoSs array - * @param count returned integer - number of members in the grantedQoSs array - * @param grantedQoSs returned array of integers - the granted qualities of service - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @return error code. 1 is success, 0 is failure - */ -int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen) -{ - MQTTHeader header = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = NULL; - int rc = 0; - int mylen; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - if (header.bits.type != SUBACK) - goto exit; - - curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ - enddata = curdata + mylen; - if (enddata - curdata < 2) - goto exit; - - *packetid = readInt(&curdata); - - *count = 0; - while (curdata < enddata) - { - if (*count > maxcount) - { - rc = -1; - goto exit; - } - grantedQoSs[(*count)++] = readChar(&curdata); - } - - rc = 1; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - -
--- a/MQTT/MQTTPacket/MQTTSubscribeServer.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,112 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "MQTTPacket.h" -#include "StackTrace.h" - -#include <string.h> - - -/** - * Deserializes the supplied (wire) buffer into subscribe data - * @param dup integer returned - the MQTT dup flag - * @param packetid integer returned - the MQTT packet identifier - * @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays - * @param count - number of members in the topicFilters and requestedQoSs arrays - * @param topicFilters - array of topic filter names - * @param requestedQoSs - array of requested QoS - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid, int maxcount, int* count, MQTTString topicFilters[], - int requestedQoSs[], unsigned char* buf, int buflen) -{ - MQTTHeader header = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = NULL; - int rc = -1; - int mylen = 0; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - if (header.bits.type != SUBSCRIBE) - goto exit; - *dup = header.bits.dup; - - curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ - enddata = curdata + mylen; - - *packetid = readInt(&curdata); - - *count = 0; - while (curdata < enddata) - { - if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata)) - goto exit; - if (curdata >= enddata) /* do we have enough data to read the req_qos version byte? */ - goto exit; - requestedQoSs[*count] = readChar(&curdata); - (*count)++; - } - - rc = 1; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Serializes the supplied suback data into the supplied buffer, ready for sending - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param packetid integer - the MQTT packet identifier - * @param count - number of members in the grantedQoSs array - * @param grantedQoSs - array of granted QoS - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTSerialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs) -{ - MQTTHeader header = {0}; - int rc = -1; - unsigned char *ptr = buf; - int i; - - FUNC_ENTRY; - if (buflen < 2 + count) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - header.byte = 0; - header.bits.type = SUBACK; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, 2 + count); /* write remaining length */ - - writeInt(&ptr, packetid); - - for (i = 0; i < count; ++i) - writeChar(&ptr, grantedQoSs[i]); - - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - -
--- a/MQTT/MQTTPacket/MQTTUnsubscribe.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#ifndef MQTTUNSUBSCRIBE_H_ -#define MQTTUNSUBSCRIBE_H_ - -int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, - int count, MQTTString topicFilters[]); - -int MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int max_count, int* count, MQTTString topicFilters[], - unsigned char* buf, int len); - -int MQTTSerialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid); - -int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int len); - -#endif /* MQTTUNSUBSCRIBE_H_ */
--- a/MQTT/MQTTPacket/MQTTUnsubscribeClient.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,106 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "MQTTPacket.h" -#include "StackTrace.h" - -#include <string.h> - -/** - * Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters - * @param count the number of topic filter strings in topicFilters - * @param topicFilters the array of topic filter strings to be used in the publish - * @return the length of buffer needed to contain the serialized version of the packet - */ -int MQTTSerialize_unsubscribeLength(int count, MQTTString topicFilters[]) -{ - int i; - int len = 2; /* packetid */ - - for (i = 0; i < count; ++i) - len += 2 + MQTTstrlen(topicFilters[i]); /* length + topic*/ - return len; -} - - -/** - * Serializes the supplied unsubscribe data into the supplied buffer, ready for sending - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @param dup integer - the MQTT dup flag - * @param packetid integer - the MQTT packet identifier - * @param count - number of members in the topicFilters array - * @param topicFilters - array of topic filter names - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, - int count, MQTTString topicFilters[]) -{ - unsigned char *ptr = buf; - MQTTHeader header = {0}; - int rem_len = 0; - int rc = -1; - int i = 0; - - FUNC_ENTRY; - if (MQTTPacket_len(rem_len = MQTTSerialize_unsubscribeLength(count, topicFilters)) > buflen) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - - header.byte = 0; - header.bits.type = UNSUBSCRIBE; - header.bits.dup = dup; - header.bits.qos = 1; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; - - writeInt(&ptr, packetid); - - for (i = 0; i < count; ++i) - writeMQTTString(&ptr, topicFilters[i]); - - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Deserializes the supplied (wire) buffer into unsuback data - * @param packetid returned integer - the MQTT packet identifier - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @return error code. 1 is success, 0 is failure - */ -int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int buflen) -{ - unsigned char type = 0; - unsigned char dup = 0; - int rc = 0; - - FUNC_ENTRY; - rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen); - if (type == UNSUBACK) - rc = 1; - FUNC_EXIT_RC(rc); - return rc; -} - -
--- a/MQTT/MQTTPacket/MQTTUnsubscribeServer.c Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,102 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - *******************************************************************************/ - -#include "MQTTPacket.h" -#include "StackTrace.h" - -#include <string.h> - - -/** - * Deserializes the supplied (wire) buffer into unsubscribe data - * @param dup integer returned - the MQTT dup flag - * @param packetid integer returned - the MQTT packet identifier - * @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays - * @param count - number of members in the topicFilters and requestedQoSs arrays - * @param topicFilters - array of topic filter names - * @param buf the raw buffer data, of the correct length determined by the remaining length field - * @param buflen the length in bytes of the data in the supplied buffer - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int maxcount, int* count, MQTTString topicFilters[], - unsigned char* buf, int len) -{ - MQTTHeader header = {0}; - unsigned char* curdata = buf; - unsigned char* enddata = NULL; - int rc = 0; - int mylen = 0; - - FUNC_ENTRY; - header.byte = readChar(&curdata); - if (header.bits.type != UNSUBSCRIBE) - goto exit; - *dup = header.bits.dup; - - curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ - enddata = curdata + mylen; - - *packetid = readInt(&curdata); - - *count = 0; - while (curdata < enddata) - { - if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata)) - goto exit; - (*count)++; - } - - rc = 1; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - - -/** - * Serializes the supplied unsuback data into the supplied buffer, ready for sending - * @param buf the buffer into which the packet will be serialized - * @param buflen the length in bytes of the supplied buffer - * @param packetid integer - the MQTT packet identifier - * @return the length of the serialized data. <= 0 indicates error - */ -int MQTTSerialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid) -{ - MQTTHeader header = {0}; - int rc = 0; - unsigned char *ptr = buf; - - FUNC_ENTRY; - if (buflen < 2) - { - rc = MQTTPACKET_BUFFER_TOO_SHORT; - goto exit; - } - header.byte = 0; - header.bits.type = UNSUBACK; - writeChar(&ptr, header.byte); /* write header */ - - ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ - - writeInt(&ptr, packetid); - - rc = ptr - buf; -exit: - FUNC_EXIT_RC(rc); - return rc; -} - -
--- a/MQTT/MQTTPacket/StackTrace.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,78 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * Contributors: - * Ian Craggs - initial API and implementation and/or initial documentation - * Ian Craggs - fix for bug #434081 - *******************************************************************************/ - -#ifndef STACKTRACE_H_ -#define STACKTRACE_H_ - -#include <stdio.h> -#define NOSTACKTRACE 1 - -#if defined(NOSTACKTRACE) -#define FUNC_ENTRY -#define FUNC_ENTRY_NOLOG -#define FUNC_ENTRY_MED -#define FUNC_ENTRY_MAX -#define FUNC_EXIT -#define FUNC_EXIT_NOLOG -#define FUNC_EXIT_MED -#define FUNC_EXIT_MAX -#define FUNC_EXIT_RC(x) -#define FUNC_EXIT_MED_RC(x) -#define FUNC_EXIT_MAX_RC(x) - -#else - -#if defined(WIN32) -#define inline __inline -#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM) -#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1) -#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM) -#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM) -#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM) -#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1) -#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM) -#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM) -#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM) -#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM) -#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM) -#else -#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM) -#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1) -#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM) -#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM) -#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM) -#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1) -#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM) -#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM) -#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM) -#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM) -#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM) - -void StackTrace_entry(const char* name, int line, int trace); -void StackTrace_exit(const char* name, int line, void* return_value, int trace); - -void StackTrace_printStack(FILE* dest); -char* StackTrace_get(unsigned long); - -#endif - -#endif - - - - -#endif /* STACKTRACE_H_ */
--- a/MQTT/MQTTSocket.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,65 +0,0 @@ -#if !defined(MQTTSOCKET_H) -#define MQTTSOCKET_H - -#include "MQTTmbed.h" -#include "TCPSocket.h" - -class MQTTSocket -{ -public: - int open(NetworkInterface* nif) - { - return socket.open(nif); - } - - int connect(char* hostname, int port, int timeout=1000) - { - socket.set_blocking (false); - socket.set_timeout(timeout); - _mutex.lock(); - int ret = (int)socket.connect(hostname, port); - _mutex.unlock(); - - return ret; - } - - int read(unsigned char* buffer, int len, int timeout) - { - socket.set_blocking (false); - socket.set_timeout(timeout); - _mutex.lock(); - int ret = (int)socket.recv((char*)buffer, len); - _mutex.unlock(); - - return ret; - } - - int write(unsigned char* buffer, int len, int timeout) - { - socket.set_blocking (false); - socket.set_timeout(timeout); - _mutex.lock(); - int ret = (int)socket.send((char*)buffer, len); - _mutex.unlock(); - - return ret; - } - - int disconnect() - { - _mutex.lock(); - int ret = (int)socket.close(); - _mutex.unlock(); - - return ret; - } - -private: - Mutex _mutex; - TCPSocket socket; - -}; - - - -#endif
--- a/MQTT/MQTTmbed.h Wed Jun 14 07:48:45 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,49 +0,0 @@ -#if !defined(MQTT_MBED_H) -#define MQTT_MBED_H - -#include "mbed.h" - -class Countdown -{ -public: - Countdown() - { - t = Timer(); - } - - Countdown(int ms) - { - t = Timer(); - countdown_ms(ms); - } - - - bool expired() - { - return t.read_ms() >= interval_end_ms; - } - - void countdown_ms(unsigned long ms) - { - t.stop(); - interval_end_ms = ms; - t.reset(); - t.start(); - } - - void countdown(int seconds) - { - countdown_ms((unsigned long)seconds * 1000L); - } - - int left_ms() - { - return interval_end_ms - t.read_ms(); - } - -private: - Timer t; - unsigned long interval_end_ms; -}; - -#endif \ No newline at end of file
--- a/Milkcocoa.cpp Wed Jun 14 07:48:45 2017 +0000 +++ b/Milkcocoa.cpp Wed Sep 06 05:58:26 2017 +0000 @@ -1,4 +1,5 @@ #include "Milkcocoa.h" +#include "OldTimer.h" #if 0 extern RawSerial pc; @@ -100,8 +101,6 @@ for (int i=0; i<MILKCOCOA_SUBSCRIBERS; i++) { milkcocoaSubscribers[i] = NULL; } - connectting = false; - mqtt_connectting = false; #ifdef __MILKCOCOA_THREAD setLoopCycle(5000); @@ -124,8 +123,6 @@ for (int i=0; i<MILKCOCOA_SUBSCRIBERS; i++) { milkcocoaSubscribers[i] = NULL; } - connectting = false; - mqtt_connectting = false; #ifdef __MILKCOCOA_THREAD setLoopCycle(5000); @@ -146,14 +143,11 @@ if(client->isConnected()) return; - connectting = true; if(client->connect(servername, portnum)!=0) { DBG(pc.printf("Network connect err\r\n");) - connectting = false; return; } - connectting = false; - mqtt_connectting = true; + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.keepAliveInterval = 20; data.cleansession = 1; @@ -164,15 +158,9 @@ if (client->connect(data) != 0) { DBG(pc.printf("Milkcocoa connect err\r\n");) - mqtt_connectting = false; return; } - mqtt_connectting = false; -} -void Milkcocoa::close() -{ - client->disconnect(); } bool Milkcocoa::push(const char *path, DataElement dataelement) { @@ -249,7 +237,6 @@ } void Milkcocoa::loop() { - if((connectting == true)||(mqtt_connectting == true)) return; connect(); client->yield(1); @@ -307,7 +294,7 @@ void Milkcocoa::cycle_Thread1(void) { cycleThread1.signal_wait(START_THREAD); while(1) { - Timer timer; + OldTimer timer; timer.start(); connect();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OldTimer.cpp Wed Sep 06 05:58:26 2017 +0000 @@ -0,0 +1,85 @@ +/* mbed Microcontroller Library + * Copyright (c) 2006-2013 ARM Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "OldTimer.h" +#include "hal/ticker_api.h" +#include "hal/us_ticker_api.h" +#include "platform/mbed_critical.h" + + +OldTimer::OldTimer() : _running(), _start(), _time(), _ticker_data(get_us_ticker_data()) { + reset(); +} + +OldTimer::OldTimer(const ticker_data_t *data) : _running(), _start(), _time(), _ticker_data(data) { + reset(); +} + +void OldTimer::start() { + core_util_critical_section_enter(); + if (!_running) { + _start = ticker_read_us(_ticker_data); + _running = 1; + } + core_util_critical_section_exit(); +} + +void OldTimer::stop() { + core_util_critical_section_enter(); + _time += slicetime(); + _running = 0; + core_util_critical_section_exit(); +} + +int OldTimer::read_us() { + return read_high_resolution_us(); +} + +float OldTimer::read() { + return (float)read_us() / 1000000.0f; +} + +int OldTimer::read_ms() { + return read_high_resolution_us() / 1000; +} + +us_timestamp_t OldTimer::read_high_resolution_us() { + core_util_critical_section_enter(); + us_timestamp_t time = _time + slicetime(); + core_util_critical_section_exit(); + return time; +} + +us_timestamp_t OldTimer::slicetime() { + us_timestamp_t ret = 0; + core_util_critical_section_enter(); + if (_running) { + ret = ticker_read_us(_ticker_data) - _start; + } + core_util_critical_section_exit(); + return ret; +} + +void OldTimer::reset() { + core_util_critical_section_enter(); + _start = ticker_read_us(_ticker_data); + _time = 0; + core_util_critical_section_exit(); +} + +OldTimer::operator float() { + return read(); +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OldTimer.h Wed Sep 06 05:58:26 2017 +0000 @@ -0,0 +1,104 @@ +/* mbed Microcontroller Library + * Copyright (c) 2006-2013 ARM Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MBED_OLDTIMER_H +#define MBED_OLDTIMER_H + +#include "platform/platform.h" +#include "hal/ticker_api.h" +#include "platform/NonCopyable.h" + +/** \addtogroup drivers */ + +/** A general purpose timer + * + * @note Synchronization level: Interrupt safe + * + * Example: + * @code + * // Count the time to toggle a LED + * + * #include "mbed.h" + * + * Timer timer; + * DigitalOut led(LED1); + * int begin, end; + * + * int main() { + * timer.start(); + * begin = timer.read_us(); + * led = !led; + * end = timer.read_us(); + * printf("Toggle the led takes %d us", end - begin); + * } + * @endcode + * @ingroup drivers + */ +class OldTimer { + +public: + OldTimer(); + OldTimer(const ticker_data_t *data); + + /** Start the timer + */ + void start(); + + /** Stop the timer + */ + void stop(); + + /** Reset the timer to 0. + * + * If it was already counting, it will continue + */ + void reset(); + + /** Get the time passed in seconds + * + * @returns Time passed in seconds + */ + float read(); + + /** Get the time passed in milli-seconds + * + * @returns Time passed in milli seconds + */ + int read_ms(); + + /** Get the time passed in micro-seconds + * + * @returns Time passed in micro seconds + */ + int read_us(); + + /** An operator shorthand for read() + */ + operator float(); + + /** Get in a high resolution type the time passed in micro-seconds. + */ + us_timestamp_t read_high_resolution_us(); + +protected: + us_timestamp_t slicetime(); + int _running; // whether the timer is running + us_timestamp_t _start; // the start time of the latest slice + us_timestamp_t _time; // any accumulated time from previous slices + const ticker_data_t *_ticker_data; +}; + + +#endif