An MQTT Client for the new etherNet Interface.

Dependencies:   EthernetInterface mbed-rtos

Dependents:   AV_MQTT niMQTT_example

Files at this revision

API Documentation at this revision

Comitter:
Nim65s
Date:
Wed Aug 07 12:57:21 2013 +0000
Child:
1:4faa96fa4350
Commit message:
First fonctionnal release

Changed in this revision

niMQTT.cpp Show annotated file Show diff for this revision Revisions of this file
niMQTT.h Show annotated file Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/niMQTT.cpp	Wed Aug 07 12:57:21 2013 +0000
@@ -0,0 +1,353 @@
+#include "niMQTT.h"
+
+niMQTT::niMQTT(char *server, int port, char *id, void (*callback)(char*, char*), char *username, char *password, bool debug):
+    server(server), port(port), id(id), callback(callback), username(username), password(password),
+    debug(debug), connected(true), message_id(0), thread(&niMQTT::thread_starter, this),
+    waiting_new_packet(true), packet_sent(false), waiting_connack(0), waiting_suback(0), waiting_pingresp(0) {
+    init();
+}
+
+int niMQTT::init() {
+    if (debug) printf("*init\r\n");
+    socket = new TCPSocketConnection;
+    do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
+    socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
+
+    printf("Socket connected.\r\n");
+
+    thread.signal_set(START_THREAD);
+
+    return connect();
+}
+
+int niMQTT::send(char *packet, int size) {
+    //if (debug) {
+        printf("*send: ");
+        for(int i=0; i<size; i++) printf("0x%x ", packet[i]);
+        printf("\r\n");
+    //}
+
+    int j = -1;
+    do j = socket->send_all(packet, size); while (j < 0);
+
+    if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size);
+    else if (debug) printf("packet sent\r\n");
+    packet_sent = true;
+
+    return (j == size);
+}
+
+int niMQTT::recv() {
+    if (debug) printf("*recv\r\n");
+
+    int timeout = 0;
+    while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1);
+    if (timeout >= TIMEOUT/100) {
+        printf("RECV TIMEOUT\r\n");
+        if (waiting_connack > 0) printf("CONNACK not received !\r\n");
+        if (waiting_suback > 0) printf("SUBCONNACK not received !\r\n");
+        if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n");
+        // TODO launch connect/sub/pingrep again ?
+        return -1;
+    }
+
+    if (debug) printf("Receiving new packet...\r\n");
+
+    char header_received;
+    socket->receive(&header_received, 1);
+    if (debug) printf("Received 0x%x\r\n", header_received);
+
+    waiting_new_packet = false;
+    bool DUP = ((header_received & 4) == 4);
+    int QoS = (header_received & 6);
+    bool RETAIN = ((header_received & 1) == 1);
+
+    switch (header_received & 0xf0) {
+        case CONNACK: connack(); break;
+        case PUBLISH: publish_received(); break;
+        case PUBACK: puback_received(); break;
+        case SUBACK: suback(); break;
+        case UNSUBACK: suback(true); break;
+        case PINGRESP: pingresp(); break;
+        default: waiting_new_packet = true; printf("BAD HEADER: 0x%x\r\n", header_received); return -1;
+    }
+
+    return 0;
+}
+
+int niMQTT::connect() {
+    if (debug) printf("*connect\r\n");
+    int username_length = strlen(username);
+    int password_length = strlen(password);
+    int id_length = strlen(id);
+
+    int use_username = (username_length != 0);
+    int use_password = (password_length != 0);
+
+    char variable_header[] = {0,6,77,81,73,115,100,112,3,
+        use_username << 7 | use_password << 6,
+        KEEP_ALIVE / 256, KEEP_ALIVE % 256 };
+
+    int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password);
+    int packet_length = 2 + remaining_length;
+
+    char fixed_header[] = { CONNECT, remaining_length };
+
+    char packet[packet_length];
+    memcpy(packet, fixed_header, 2);
+    memcpy(packet + 2, variable_header, 12);
+
+    // Adds the payload: id
+    char id_size[2] = { id_length / 256, id_length % 256 };
+    memcpy(packet + 14, id_size, 2);
+    memcpy(packet + 16, id, id_length);
+
+    // Adds username & Password to the payload
+    if (use_username) {
+        char username_size[2] = { username_length / 256, username_length % 256 };
+        memcpy(packet + 16 + id_length, username_size, 2);
+        memcpy(packet + 18 + id_length, username, username_length);
+    }
+    if (use_password) {
+        char password_size[2] = { password_length / 256, password_length % 256 };
+        memcpy(packet + 18 + id_length + username_length, password_size, 2);
+        memcpy(packet + 20 + id_length + username_length, password, password_length);
+    }
+
+    waiting_connack++;
+
+    return send(packet, packet_length);
+}
+
+int niMQTT::connack() {
+    if (debug) printf("CONNACK Received\r\n");
+    if (waiting_connack > 0) waiting_connack--;
+    else printf("CONNACK UNEXPECTED !\r\n");
+
+    char resp[3];
+    socket->receive(resp, 3);
+    waiting_new_packet = true;
+
+    if (resp[0] != 0x2) printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]);
+    switch (resp[2]) {
+        case 0: printf("Connection Accepted\r\n"); break;
+        case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break;
+        case 2: printf("Connection Refused: identifier rejected\r\n"); break;
+        case 3: printf("Connection Refused: server unavailable\r\n"); break;
+        case 4: printf("Connection Refused: bad user name or password\r\n"); break;
+        case 5: printf("Connection Refused: not authorized\r\n"); break;
+        default: printf("I have no idea what I am doing\r\n");
+    }
+
+    return (resp[2] == 0);
+}
+
+int niMQTT::pub(char *topic, char *message) {
+    if (debug) printf("*pub\r\n");
+    int topic_length = strlen(topic);
+    int message_length = strlen(message);
+
+    int remaining_length = topic_length + message_length + 2;
+    int remaining_length_2 = remaining_length_length(remaining_length);
+    int packet_length = 1 + remaining_length + remaining_length_2;
+
+    char header = PUBLISH;
+    char packet[packet_length];
+    // header
+    memcpy(packet, &header, 1);
+    get_remaining_length(remaining_length, packet);
+
+    // variable header: topic name
+    char topic_size[2] = { topic_length / 256, topic_length % 256 };
+    memcpy(packet + 1 + remaining_length_2, topic_size, 2);
+    memcpy(packet + 3 + remaining_length_2, topic, topic_length);
+
+    // payload: message
+    memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length);
+
+    return send(packet, packet_length);
+}
+
+void niMQTT::publish_received() {
+    //remaining length
+    int remaining_length = decode_remaining_length();
+
+    // topic
+    char mqtt_utf8_length[2];
+    socket->receive(mqtt_utf8_length, 2);
+    int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1];
+
+    if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length);
+
+    char topic[utf8_length + 1];
+    socket->receive(topic, utf8_length);
+    topic[utf8_length] = 0;
+
+    // payload
+    int message_length = remaining_length - utf8_length - 2;
+    char message[message_length + 1];
+    socket->receive(message, message_length);
+    message[message_length] = 0;
+
+    waiting_new_packet = true;
+
+    callback(topic, message);
+}
+
+int niMQTT::puback() {
+    if (debug) printf("*puback\r\n");
+    char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 };
+    return send(fixed_header, 4);
+}
+
+int niMQTT::puback_received() {
+    waiting_new_packet = true;
+    return 0; // TODO
+}
+
+int niMQTT::sub(char *topic, bool unsub) {
+    if (debug) printf("*sub\r\n");
+    char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE;
+    int topic_length = strlen(topic);
+
+    int remaining_length = topic_length + 5;
+    int remaining_length_2 = remaining_length_length(remaining_length);
+    int packet_length = 1 + remaining_length + remaining_length_2;
+
+    char header = command | LEAST_ONCE;
+    char packet[packet_length];
+    // header
+    memcpy(packet, &header, 1);
+    get_remaining_length(remaining_length, packet);
+
+    // variable header: message identifier
+    message_id++;
+    char variable_header[] = { message_id / 256, message_id % 256 };
+    memcpy(packet + 1 + remaining_length_2, variable_header, 2);
+
+    // payload: topic name & requested QoS
+    char topic_size[2] = { topic_length / 256, topic_length % 256 };
+    char requested_qos = MOST_ONCE;
+    memcpy(packet + 3 + remaining_length_2, topic_size, 2);
+    memcpy(packet + 5 + remaining_length_2, topic, topic_length);
+    memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1);
+
+    waiting_suback++;
+
+    return send(packet, packet_length);
+}
+
+int niMQTT::suback(bool unsub) {
+    if (debug) printf("SUBACK received\r\n");
+    if (waiting_suback > 0) waiting_suback--;
+    else printf("SUBACK UNEXPECTED !\r\n");
+
+    char command = (unsub) ? UNSUBACK : SUBACK; // TODO
+
+    int remaining_length = decode_remaining_length();
+
+    // Variable Header
+    char var_resp[remaining_length];
+    socket->receive(var_resp, remaining_length);
+    waiting_new_packet = true;
+    if (debug) {
+        printf("suback: ");
+        for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]);
+        printf("\r\n");
+    }
+
+    if (var_resp[0] * 256 + var_resp[1] != message_id) {
+        printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id);
+    }
+
+    // here we should do things about the QoS if /unsuback, but let's say it's 0.
+
+    return (var_resp[0] * 256 + var_resp[1] == message_id);
+}
+
+int niMQTT::pingreq () {
+    if (debug) printf("*pingreq\r\n");
+    char fixed_header[] = { PINGREQ, 0 };
+    waiting_pingresp++;
+    return send(fixed_header, 2);
+}
+
+int niMQTT::pingresp() {
+    if (debug) printf("PINGRESP Received\r\n");
+    if (waiting_pingresp > 0) waiting_pingresp--;
+    else printf("PINGRESP Unexpected !\r\n");
+
+    char resp;
+    socket->receive(&resp, 1);
+    waiting_new_packet = true;
+
+    if (resp != 0) printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp);
+
+    return (resp == 0);
+}
+
+int niMQTT::disconnect() {
+    if (debug) printf("*disconnect\r\n");
+    char fixed_header[] = { DISCONNECT, 0 };
+    return send(fixed_header, 2);
+}
+
+niMQTT::~niMQTT() {
+    if (debug) printf("*~niMQTT()\r\n");
+    connected = false;
+    disconnect();
+    socket->close();
+    delete socket;
+}
+
+void niMQTT::thread_starter(void const *p) {
+    niMQTT *instance = (niMQTT*)p;
+    instance->thread_worker();
+}
+
+void niMQTT::thread_worker() {
+    if (debug) printf("*thread_worker\r\n");
+    thread.signal_wait(START_THREAD);
+    while (connected) {
+        if (debug) printf("New loop in thread worker\r\n");
+        recv();
+        Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds
+        if (!packet_sent) pingreq();
+        packet_sent = false;
+    }
+}
+
+void niMQTT::get_remaining_length(int remaining_length, char *packet) {
+    int X = remaining_length;
+    int n = 1;
+    char digit;
+    do {
+        digit = X % 0x80;
+        X /= 0x80;
+        if (X > 0) digit |= 0x80;
+        memcpy(packet + n, &digit, 1);
+        n++;
+    } while (X > 0);
+}
+
+int niMQTT::decode_remaining_length() {
+    int multiplier = 1;
+    int value = 0;
+    char digit = 0;
+    do {
+        while (socket->receive(&digit, 1) < 0) wait(0.1);
+        value += (digit & 127) * multiplier;
+        multiplier *= 128;
+    } while ((digit & 0x80) != 0);
+    return value;
+}
+
+int niMQTT::remaining_length_length(int remaining_length) {
+    int X = remaining_length;
+    int rll = 0;
+    do {
+        rll++;
+        X /= 0x80;
+    } while (X > 0);
+    return rll;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/niMQTT.h	Wed Aug 07 12:57:21 2013 +0000
@@ -0,0 +1,123 @@
+#ifndef NIMQTT_H
+#define NIMQTT_H
+
+#include "mbed.h"
+#include "rtos.h"
+#include "EthernetInterface.h"
+
+enum { // Message Type
+    ZERO,
+    CONNECT_NUM,
+    CONNACK_NUM,
+    PUBLISH_NUM,
+    PUBACK_NUM,
+    PUBREC_NUM,
+    PUBREL_NUM,
+    PUBCOMP_NUM,
+    SUBSCRIBE_NUM,
+    SUBACK_NUM,
+    UNSUBSCRIBE_NUM,
+    UNSUBACK_NUM,
+    PINGREQ_NUM,
+    PINGRESP_NUM,
+    DISCONNECT_NUM
+};
+
+#define CONNECT     CONNECT_NUM     << 4
+#define CONNACK     CONNACK_NUM     << 4
+#define PUBLISH     PUBLISH_NUM     << 4
+#define PUBACK      PUBACK_NUM      << 4
+#define PUBREC      PUBREC_NUM      << 4
+#define PUBREL      PUBREL_NUM      << 4
+#define PUBCOMP     PUBCOMP_NUM     << 4
+#define SUBSCRIBE   SUBSCRIBE_NUM   << 4
+#define SUBACK      SUBACK_NUM      << 4
+#define UNSUBSCRIBE UNSUBSCRIBE_NUM << 4
+#define UNSUBACK    UNSUBACK_NUM    << 4
+#define PINGREQ     PINGREQ_NUM     << 4
+#define PINGRESP    PINGRESP_NUM    << 4
+#define DISCONNECT  DISCONNECT_NUM  << 4
+
+enum { // QoS level
+    MOST_ONCE_NUM,
+    LEAST_ONCE_NUM,
+    EXACTLY_ONCE_NUM
+};
+
+#define MOST_ONCE       MOST_ONCE_NUM       << 1
+#define LEAST_ONCE      LEAST_ONCE_NUM      << 1
+#define EXACTLY_ONCE    EXACTLY_ONCE_NUM    << 1
+
+#define KEEP_ALIVE 300 // seconds
+#define TIMEOUT 1000 // ms
+
+#define START_THREAD 1
+
+class niMQTT {
+    public:
+        /** Initialise and launch the MQTT Client
+         * \param server the address of your server
+         * \param port the port of your server
+         * \param id the id of this client (should be unique)
+         * \param callback a callback to execute on receiving a PUBLISH
+         * \param username your username for the server
+         * \param password your password for the server
+         * \param debug get a more verbose output
+         */
+        niMQTT(char *server, int port=1884, char *id="mbed", void (*callback)(char *, char*), char *username="", char *password="", bool debug=false);
+        ~niMQTT();
+
+        /* Publish a message on a topic
+         * \param topic the topic
+         * \param message the message
+         */
+        int pub(char *topic, char *message);
+
+        /** Subscribe to a topic
+         * \param topic the topic
+         */
+        int sub(char *topic, bool unsub=false);
+
+    private:
+        char *server;
+        int port;
+        char *id;
+        void (*callback)(char *, char*);
+        char *username;
+        char *password;
+
+        bool debug;
+        bool connected;
+        int message_id;
+
+        TCPSocketConnection *socket;
+
+        int remaining_length_length(int remaining_length);
+        void get_remaining_length(int remaining_length, char *packet);
+        int send(char *packet, int size);
+
+        int decode_remaining_length();
+        int recv();
+
+        int init();
+
+        int connect();
+        int connack();
+        int suback(bool unsub=false);
+        int puback();
+        int puback_received();
+        int pingreq();
+        int pingresp();
+        int disconnect();
+
+        static void thread_starter(void const *p);
+        void thread_worker();
+        Thread thread;
+
+        void publish_received();
+
+        bool waiting_new_packet, packet_sent;
+        int waiting_connack, waiting_suback, waiting_pingresp;
+};
+
+#endif