Arch GPRS MQTT demo

Dependencies:   GPRSInterface USBDevice mbed

Fork of Seeed_HTTPClient_GPRSInterface_HelloWorld by Seeed

Files at this revision

API Documentation at this revision

Comitter:
yihui
Date:
Fri Apr 03 08:29:43 2015 +0000
Parent:
2:ecc68e79d692
Child:
4:14a9621ec99c
Commit message:
Arch GPRS MQTT demo

Changed in this revision

GPRSInterface.lib Show annotated file Show diff for this revision Revisions of this file
HTTPClient_GPRS.lib Show diff for this revision Revisions of this file
PubSubClient/PubSubClient.cpp Show annotated file Show diff for this revision Revisions of this file
PubSubClient/PubSubClient.h Show annotated file Show diff for this revision Revisions of this file
USBDevice.lib Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
mbed.bld Show annotated file Show diff for this revision Revisions of this file
--- a/GPRSInterface.lib	Fri Mar 06 09:22:41 2015 +0000
+++ b/GPRSInterface.lib	Fri Apr 03 08:29:43 2015 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/users/lawliet/code/GPRSInterface/#180feb3ebe62
+http://mbed.org/users/lawliet/code/GPRSInterface/#379ce1d51b88
--- a/HTTPClient_GPRS.lib	Fri Mar 06 09:22:41 2015 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-http://mbed.org/users/lawliet/code/HTTPClient_GPRS/#aaab2081c1c0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/PubSubClient/PubSubClient.cpp	Fri Apr 03 08:29:43 2015 +0000
@@ -0,0 +1,326 @@
+/*
+PubSubClient.cpp - A simple client for MQTT.
+Nicholas O'Leary
+http://knolleary.net
+
+initial port for mbed 
+Joerg Wende
+https://twitter.com/joerg_wende
+*/
+
+
+
+#include "PubSubClient.h"
+
+#include "USBSerial.h"
+
+extern USBSerial pc;
+
+#define LOG(args...)    pc.printf(args)
+
+Timer t;
+
+int millis()
+{
+    return t.read_ms();
+}
+
+PubSubClient::PubSubClient()
+{
+}
+
+PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int))
+{
+    this->callback = callback;
+    this->ip = ip;
+    this->port = port;
+    t.start();
+}
+
+
+bool PubSubClient::connect(char *id)
+{
+    return connect(id,NULL,NULL,0,0,0,0);
+}
+
+bool PubSubClient::connect(char *id, char *user, char *pass)
+{
+    return connect(id,user,pass,0,0,0,0);
+}
+
+bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
+{
+    return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
+}
+
+bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
+{
+    if (!connected()) {
+        int result = 0;
+        result = _client.connect(this->ip, this->port);
+        _client.set_blocking(false, 1);
+        LOG("IP: %s\r\n",this->ip);
+        LOG("Port: %i\r\n",this->port);
+        LOG("Result: %i \r\n", result);
+
+        if (result==0) {
+            nextMsgId = 1;
+            char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
+            // Leave room in the buffer for header and variable length field
+            int length = 5;
+            unsigned int j;
+            for (j = 0; j<9; j++) {
+                buffer[length++] = d[j];
+            }
+
+            char v;
+            if (willTopic) {
+                v = 0x06|(willQos<<3)|(willRetain<<5);
+            } else {
+                v = 0x02;
+            }
+
+            if(user != NULL) {
+                v = v|0x80;
+
+                if(pass != NULL) {
+                    v = v|(0x80>>1);
+                }
+            }
+
+            buffer[length++] = v;
+
+            buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
+            buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
+            length = writeString(id,buffer,length);
+            if (willTopic) {
+                length = writeString(willTopic,buffer,length);
+                length = writeString(willMessage,buffer,length);
+            }
+
+            if(user != NULL) {
+                length = writeString(user,buffer,length);
+                if(pass != NULL) {
+                    length = writeString(pass,buffer,length);
+                }
+            }
+            LOG("Before MQTT Connect ... \r\n");
+            write(MQTTCONNECT,buffer,length-5);
+
+            lastInActivity = lastOutActivity = millis();
+
+            int llen=128;
+            int len =0;
+            
+            while ((len=readPacket(llen))==0) {
+                unsigned long t = millis();
+                if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
+                    _client.close();
+                    return false;
+                }
+            }
+            LOG("after MQTT Connect ... %i\r\n", len);
+            if (len == 4 && buffer[3] == 0) {
+                lastInActivity = millis();
+                pingOutstanding = false;
+                return true;
+            }
+        }
+        _client.close();
+    }
+    return false;
+}
+
+
+int PubSubClient::readPacket(int lengthLength)
+{
+    int len = 0;
+    len = _client.receive_all(buffer,lengthLength);
+    return len;
+}
+
+bool PubSubClient::loop()
+{
+    if (connected()) {
+        unsigned long t = millis();
+        if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
+            if (pingOutstanding) {
+                _client.close();
+                return false;
+            } else {
+                buffer[0] = MQTTPINGREQ;
+                buffer[1] = 0;
+                _client.send(buffer,2);
+                lastOutActivity = t;
+                lastInActivity = t;
+                pingOutstanding = true;
+            }
+        }
+        int len;
+        int llen= 128;
+        if (!((len=readPacket(llen))==0)) {
+            if (len > 0) {
+                lastInActivity = t;
+                char type = buffer[0]&0xF0;
+                if (type == MQTTPUBLISH) {
+                    if (callback) {
+                        LOG("MQTTPUBLISH - bytes received:%i\r\n", len);
+                        int tl = (buffer[2]<<8)+buffer[3];
+                        LOG("t1:%i\r\n",tl);
+                        char topic[tl+1];
+                        for (int i=0; i<tl; i++) {
+                            topic[i] = buffer[4+i];
+                        }
+                        topic[tl] = 0;
+                        LOG("Topic:%s\r\n",topic);
+                        // ignore msgID - only support QoS 0 subs
+                        int t2 = len-4-tl;
+                        LOG("t2:%i\r\n",t2);
+                        char payload[t2+1];
+                        for (int i=0; i<t2; i++) {
+                            payload[i] = buffer[4+i+tl];
+                        }
+                        payload[t2] = 0;
+                        LOG("Payload:%s\r\n", payload);
+                        callback(topic,payload,t2);
+                    }
+                } else if (type == MQTTPINGREQ) {
+                    buffer[0] = MQTTPINGRESP;
+                    buffer[1] = 0;
+                    _client.send(buffer,2);
+                } else if (type == MQTTPINGRESP) {
+                    pingOutstanding = false;
+                }
+            }
+        }
+        return true;
+    }
+    return false;
+}
+
+bool PubSubClient::publish(char* topic, char* payload)
+{
+    return publish(topic,payload,strlen(payload),false);
+}
+
+bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
+{
+    return publish(topic, payload, plength, false);
+}
+
+bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
+{
+    if (connected()) {
+        // Leave room in the buffer for header and variable length field
+        LOG("publish - topic:%s, payload:%s\r\n", topic, payload);
+        int length = 5;
+        length = writeString(topic,buffer,length);
+        int i;
+        for (i=0; i<plength; i++) {
+            buffer[length++] = payload[i];
+        }
+        short header = MQTTPUBLISH;
+        if (retained) {
+            header |= 1;
+        }
+        return write(header,buffer,length-5);
+    }
+    return false;
+}
+
+
+
+bool PubSubClient::write(short header, char* buf, int length)
+{
+    short lenBuf[4];
+    short llen = 0;
+    short digit;
+    short pos = 0;
+    short rc;
+    short len = length;
+    do {
+        digit = len % 128;
+        len = len / 128;
+        if (len > 0) {
+            digit |= 0x80;
+        }
+        lenBuf[pos++] = digit;
+        llen++;
+    } while(len>0);
+
+    buf[4-llen] = header;
+    for (int i=0; i<llen; i++) {
+        buf[5-llen+i] = lenBuf[i];
+    }
+    rc = _client.send(buf+(4-llen),length+1+llen);
+
+    lastOutActivity = millis();
+    return (rc == 1+llen+length);
+}
+
+bool PubSubClient::subscribe(char* topic)
+{
+    LOG("subscribe - topic:%s\r\n",topic);
+    if (connected()) {
+        // Leave room in the buffer for header and variable length field
+        int length = 5;
+        nextMsgId++;
+        if (nextMsgId == 0) {
+            nextMsgId = 1;
+        }
+        buffer[length++] = (nextMsgId >> 8);
+        buffer[length++] = (nextMsgId & 0xFF);
+        length = writeString(topic, buffer,length);
+        buffer[length++] = 0; // Only do QoS 0 subs
+        return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
+    }
+    return false;
+}
+
+bool PubSubClient::unsubscribe(char* topic)
+{
+    if (connected()) {
+        int length = 5;
+        nextMsgId++;
+        if (nextMsgId == 0) {
+            nextMsgId = 1;
+        }
+        buffer[length++] = (nextMsgId >> 8);
+        buffer[length++] = (nextMsgId & 0xFF);
+        length = writeString(topic, buffer,length);
+        return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
+    }
+    return false;
+}
+
+void PubSubClient::disconnect()
+{
+    buffer[0] = MQTTDISCONNECT;
+    buffer[1] = 0;
+    _client.send(buffer,2);
+    _client.close();
+    lastInActivity = lastOutActivity = millis();
+}
+
+int PubSubClient::writeString(char* string, char* buf, int pos)
+{
+    char* idp = string;
+    int i = 0;
+    pos += 2;
+    while (*idp) {
+        buf[pos++] = *idp++;
+        i++;
+    }
+    buf[pos-i-2] = (i >> 8);
+    buf[pos-i-1] = (i & 0xFF);
+    return pos;
+}
+
+
+bool PubSubClient::connected()
+{
+    bool rc;
+    rc = (int)_client.is_connected();
+//     if (!rc) _client.close();
+    return rc;
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/PubSubClient/PubSubClient.h	Fri Apr 03 08:29:43 2015 +0000
@@ -0,0 +1,76 @@
+/*
+PubSubClient.h - A simple client for MQTT.
+Nicholas O'Leary
+http://knolleary.net
+*/
+#include "mbed.h"
+#include "GPRSInterface.h"
+#include "TCPSocketConnection.h"
+
+#ifndef PubSubClient_h
+#define PubSubClient_h
+
+
+
+// MQTT_MAX_PACKET_SIZE : Maximum packet size
+#define MQTT_MAX_PACKET_SIZE 128
+
+// MQTT_KEEPALIVE : keepAlive interval in Seconds
+#define MQTT_KEEPALIVE 15
+
+#define MQTTPROTOCOLVERSION 3
+#define MQTTCONNECT 1 << 4 // Client request to connect to Server
+#define MQTTCONNACK 2 << 4 // Connect Acknowledgment
+#define MQTTPUBLISH 3 << 4 // Publish message
+#define MQTTPUBACK 4 << 4 // Publish Acknowledgment
+#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1)
+#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2)
+#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3)
+#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request
+#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment
+#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request
+#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment
+#define MQTTPINGREQ 12 << 4 // PING Request
+#define MQTTPINGRESP 13 << 4 // PING Response
+#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting
+#define MQTTReserved 15 << 4 // Reserved
+
+#define MQTTQOS0 (0 << 1)
+#define MQTTQOS1 (1 << 1)
+#define MQTTQOS2 (2 << 1)
+
+class PubSubClient {
+private:
+   TCPSocketConnection _client;
+   char buffer[MQTT_MAX_PACKET_SIZE];
+   int nextMsgId;
+   unsigned long lastOutActivity;
+   unsigned long lastInActivity;
+   bool pingOutstanding;
+   void (*callback)(char*,char*,unsigned int);
+   int readPacket(int);
+   char readByte();
+   bool write(short header, char* buf, int length);
+   int writeString(char* string, char* buf, int pos);
+   char* ip;
+   int port;
+public:
+   PubSubClient();
+   PubSubClient(char*, int, void(*)(char*,char*,unsigned int));
+   bool connect(char *);
+   bool connect(char *, char *, char *);
+   bool connect(char *, char *, short, short, char *);
+   bool connect(char *, char *, char *, char *, short, short, char*);
+   void disconnect();
+   bool publish(char *, char *);
+   bool publish(char *, char *, unsigned int);
+   bool publish(char *, char *, unsigned int, bool);
+//   bool publish_P(char *, short PROGMEM *, unsigned int, bool);
+   bool subscribe(char *);
+   bool unsubscribe(char *);
+   bool loop();
+   bool connected();
+};
+
+
+#endif
\ No newline at end of file
--- a/USBDevice.lib	Fri Mar 06 09:22:41 2015 +0000
+++ b/USBDevice.lib	Fri Apr 03 08:29:43 2015 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/users/mbed_official/code/USBDevice/#4f589e246b9e
+http://mbed.org/users/mbed_official/code/USBDevice/#0f216c4e75e5
--- a/main.cpp	Fri Mar 06 09:22:41 2015 +0000
+++ b/main.cpp	Fri Apr 03 08:29:43 2015 +0000
@@ -1,43 +1,57 @@
+
+/* Copyright (c) 2010-2011 mbed.org, MIT License
+*
+* Permission is hereby granted, free of charge, to any person obtaining a copy of this software
+* and associated documentation files (the "Software"), to deal in the Software without
+* restriction, including without limitation the rights to use, copy, modify, merge, publish,
+* distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
+* Software is furnished to do so, subject to the following conditions:
+*
+* The above copyright notice and this permission notice shall be included in all copies or
+* substantial portions of the Software.
+*
+* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
+* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
 #include "mbed.h"
 #include "GPRSInterface.h"
-#include "HTTPClient.h"
-
-#ifdef TARGET_ARCH_GPRS
+#include "PubSubClient.h"
 #include "USBSerial.h"
-#define LOG(args...)    pc.printf(args)
-USBSerial pc;
-#else
-#define LOG(args...)    pc.printf(args)
-Serial pc(USBTX, USBRX);
-#endif
+
+#define NETWORK_APN     ""
 
-#define TEST_HTTP_GET       1
-#define TEST_HTTP_POST      1
-#define TEST_HTTP_PUT       1
-#define TEST_HTTP_DELETE    1
-
-#define PIN_TX              P1_27
-#define PIN_RX              P1_26
+#define PIN_PWR                 P1_2    //power up gprs module
+#define PIN_PWR_KEY             P1_7
+#define PIN_TX                  P1_27   //Serial tx pin
+#define PIN_RX                  P1_26   //Serial rx pin
 
 
-GPRSInterface gprs(PIN_TX, PIN_RX, 115200, "uninet", NULL, NULL);
-HTTPClient http;
-char str[1024];
+DigitalOut power(PIN_PWR);
+DigitalOut powerKey(PIN_PWR_KEY);
 
-#ifdef TARGET_ARCH_GPRS
-DigitalOut power(P1_2);
-DigitalOut powerKey(P1_7);
+USBSerial pc;
+
+//Serial uart(USBTX, USBRX);
+GPRSInterface eth(PIN_TX, PIN_RX, NETWORK_APN, "", "");
 
-/* power pin: low enable
-     ___
-        |___
+char* serverIpAddr = "opensensors.io";  /*Sever ip address*/
+int port = 1883; /*Sever Port*/
+void callback(char* topic, char* payload, unsigned int len); /*Callback function prototype*/
+PubSubClient mqtt(serverIpAddr, port, callback);
 
-   powerKey pin: you can also power up by long press the powerKey.
+void callback(char* topic, char* payload, unsigned int len)
+{
+    pc.printf("Topic:%s\r\n", topic);
+    pc.printf("Payload:%s\r\n\r\n", payload);
 
-        ___
-    ___|   |___
+    //Send incoming payloads back to topic "/mbed".
+    mqtt.publish("/users/yihui/message", payload, len);
+}
 
-*/
 void gprsPowerUp(void)
 {
     power = 1;
@@ -52,78 +66,59 @@
     powerKey = 0;
     wait(3);
 }
-#endif
 
 int main()
 {
-#ifdef TARGET_ARCH_GPRS
+    pc.printf("Powering up!");
     gprsPowerUp();
-#endif
-    gprs.init();
-
-    while(false == gprs.connect()) {
-        LOG("gprs connect error\n");
-        wait(2);
+    wait(10);
+    
+    // Initialize the interface.
+    int s = eth.init();
+    if (s != NULL) {
+        pc.printf(">>> Could not initialise. Halting!\n");
+        exit(0);
     }
 
-    // successful DHCP
-    LOG("IP Address is %s\n", gprs.getIPAddress());
-
-    int ret;
-    HTTPMap map;
-    HTTPText inText(str, 1024);
-    HTTPText outText(str);
-
-#if TEST_HTTP_GET
-    //GET data
-    LOG("\nTrying to fetch page...\n");
-    ret = http.get("http://developer.mbed.org/media/uploads/mbed_official/hello.txt", str, 1024);
-    if (!ret) {
-        LOG("Page fetched successfully - read %d characters\n", strlen(str));
-        LOG("Result: %s\n", str);
-    } else {
-        LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode());
-    }
-#endif
+    pc.printf(">>> Get IP address...\n");
+    while (1) {
+        s = eth.connect(); // Connect to network
 
-#if TEST_HTTP_POST
-    //POST data
-    map.put("Hello", "World");
-    map.put("test", "1234");
-    LOG("\nTrying to post data...\n");
-    ret = http.post("http://httpbin.org/post", map, &inText);
-    if (!ret) {
-        LOG("Executed POST successfully\n");
-    } else {
-        LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode());
+        if (s == false || s < 0) {
+            pc.printf(">>> Could not connect to network. Retrying!\n");
+            wait(3);
+        } else {
+            break;
+        }
     }
-#endif
-
-#if TEST_HTTP_PUT
-    //PUT data
-    strcpy(str, "This is a PUT test!");
-    LOG("\nTrying to put resource...\n");
-    ret = http.put("http://httpbin.org/put", outText, &inText);
-    if (!ret) {
-        LOG("Executed PUT successfully - read %d characters\n", strlen(str));
-        LOG("Result: %s\n", str);
-    } else {
-        LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode());
+    pc.printf(">>> Got IP address: %s\n", eth.getIPAddress());
+    
+    char clientID[] = "1095";   /*Client nanme show for MQTT server*/
+    char user[] = "yihui";
+    char password[] = "cLkFPjQa";
+    char pub_topic[] = "/users/yihui/test";   /*Publish to topic : "/users/kinan/test" */
+    char sub_topic[] = "/users/yihui/test";   /*Subscribe to topic : "/users/kinan/test" */
+    
+    
+mqtt_connect:
+    while (!mqtt.connect(clientID, user, password)){
+        pc.printf("\r\nConnect to server failed ..\r\n");
+        pc.printf("wait 3 seconds. Retrying\n");
+        wait(3);
     }
-#endif
-
-#if TEST_HTTP_DELETE
-    //DELETE data
-    LOG("\nTrying to delete resource...\n");
-    ret = http.del("http://httpbin.org/delete", &inText);
-    if (!ret) {
-        LOG("Executed DELETE successfully\n");
-    } else {
-        LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode());
+    pc.printf("\r\nConnect to server sucessed ..\r\n");
+    
+    mqtt.publish(pub_topic, "Hello from Arch GPRS");
+    mqtt.subscribe(sub_topic);
+    
+    while(1) {
+        if (!mqtt.loop()) {
+            goto mqtt_connect;
+        }
     }
-#endif
-
-    gprs.disconnect();
-
+    
+    // Disconnect from network
+    //eth.disconnect();
+    
     return 0;
 }
--- a/mbed.bld	Fri Mar 06 09:22:41 2015 +0000
+++ b/mbed.bld	Fri Apr 03 08:29:43 2015 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/users/mbed_official/code/mbed/builds/7e07b6fb45cf
\ No newline at end of file
+http://mbed.org/users/mbed_official/code/mbed/builds/487b796308b0
\ No newline at end of file