Arch GPRS MQTT demo
Dependencies: GPRSInterface USBDevice mbed
Fork of Seeed_HTTPClient_GPRSInterface_HelloWorld by
Revision 3:9dc67659d945, committed 2015-04-03
- Comitter:
- yihui
- Date:
- Fri Apr 03 08:29:43 2015 +0000
- Parent:
- 2:ecc68e79d692
- Child:
- 4:14a9621ec99c
- Commit message:
- Arch GPRS MQTT demo
Changed in this revision
--- a/GPRSInterface.lib Fri Mar 06 09:22:41 2015 +0000 +++ b/GPRSInterface.lib Fri Apr 03 08:29:43 2015 +0000 @@ -1,1 +1,1 @@ -http://mbed.org/users/lawliet/code/GPRSInterface/#180feb3ebe62 +http://mbed.org/users/lawliet/code/GPRSInterface/#379ce1d51b88
--- a/HTTPClient_GPRS.lib Fri Mar 06 09:22:41 2015 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -http://mbed.org/users/lawliet/code/HTTPClient_GPRS/#aaab2081c1c0
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PubSubClient/PubSubClient.cpp Fri Apr 03 08:29:43 2015 +0000 @@ -0,0 +1,326 @@ +/* +PubSubClient.cpp - A simple client for MQTT. +Nicholas O'Leary +http://knolleary.net + +initial port for mbed +Joerg Wende +https://twitter.com/joerg_wende +*/ + + + +#include "PubSubClient.h" + +#include "USBSerial.h" + +extern USBSerial pc; + +#define LOG(args...) pc.printf(args) + +Timer t; + +int millis() +{ + return t.read_ms(); +} + +PubSubClient::PubSubClient() +{ +} + +PubSubClient::PubSubClient(char *ip, int port, void (*callback)(char*,char*,unsigned int)) +{ + this->callback = callback; + this->ip = ip; + this->port = port; + t.start(); +} + + +bool PubSubClient::connect(char *id) +{ + return connect(id,NULL,NULL,0,0,0,0); +} + +bool PubSubClient::connect(char *id, char *user, char *pass) +{ + return connect(id,user,pass,0,0,0,0); +} + +bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage) +{ + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage) +{ + if (!connected()) { + int result = 0; + result = _client.connect(this->ip, this->port); + _client.set_blocking(false, 1); + LOG("IP: %s\r\n",this->ip); + LOG("Port: %i\r\n",this->port); + LOG("Result: %i \r\n", result); + + if (result==0) { + nextMsgId = 1; + char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; + // Leave room in the buffer for header and variable length field + int length = 5; + unsigned int j; + for (j = 0; j<9; j++) { + buffer[length++] = d[j]; + } + + char v; + if (willTopic) { + v = 0x06|(willQos<<3)|(willRetain<<5); + } else { + v = 0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + LOG("Before MQTT Connect ... \r\n"); + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + int llen=128; + int len =0; + + while ((len=readPacket(llen))==0) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _client.close(); + return false; + } + } + LOG("after MQTT Connect ... %i\r\n", len); + if (len == 4 && buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + return true; + } + } + _client.close(); + } + return false; +} + + +int PubSubClient::readPacket(int lengthLength) +{ + int len = 0; + len = _client.receive_all(buffer,lengthLength); + return len; +} + +bool PubSubClient::loop() +{ + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + _client.close(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client.send(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + int len; + int llen= 128; + if (!((len=readPacket(llen))==0)) { + if (len > 0) { + lastInActivity = t; + char type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + LOG("MQTTPUBLISH - bytes received:%i\r\n", len); + int tl = (buffer[2]<<8)+buffer[3]; + LOG("t1:%i\r\n",tl); + char topic[tl+1]; + for (int i=0; i<tl; i++) { + topic[i] = buffer[4+i]; + } + topic[tl] = 0; + LOG("Topic:%s\r\n",topic); + // ignore msgID - only support QoS 0 subs + int t2 = len-4-tl; + LOG("t2:%i\r\n",t2); + char payload[t2+1]; + for (int i=0; i<t2; i++) { + payload[i] = buffer[4+i+tl]; + } + payload[t2] = 0; + LOG("Payload:%s\r\n", payload); + callback(topic,payload,t2); + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client.send(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +bool PubSubClient::publish(char* topic, char* payload) +{ + return publish(topic,payload,strlen(payload),false); +} + +bool PubSubClient::publish(char* topic, char* payload, unsigned int plength) +{ + return publish(topic, payload, plength, false); +} + +bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained) +{ + if (connected()) { + // Leave room in the buffer for header and variable length field + LOG("publish - topic:%s, payload:%s\r\n", topic, payload); + int length = 5; + length = writeString(topic,buffer,length); + int i; + for (i=0; i<plength; i++) { + buffer[length++] = payload[i]; + } + short header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + return write(header,buffer,length-5); + } + return false; +} + + + +bool PubSubClient::write(short header, char* buf, int length) +{ + short lenBuf[4]; + short llen = 0; + short digit; + short pos = 0; + short rc; + short len = length; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0; i<llen; i++) { + buf[5-llen+i] = lenBuf[i]; + } + rc = _client.send(buf+(4-llen),length+1+llen); + + lastOutActivity = millis(); + return (rc == 1+llen+length); +} + +bool PubSubClient::subscribe(char* topic) +{ + LOG("subscribe - topic:%s\r\n",topic); + if (connected()) { + // Leave room in the buffer for header and variable length field + int length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + buffer[length++] = 0; // Only do QoS 0 subs + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +bool PubSubClient::unsubscribe(char* topic) +{ + if (connected()) { + int length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() +{ + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client.send(buffer,2); + _client.close(); + lastInActivity = lastOutActivity = millis(); +} + +int PubSubClient::writeString(char* string, char* buf, int pos) +{ + char* idp = string; + int i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +bool PubSubClient::connected() +{ + bool rc; + rc = (int)_client.is_connected(); +// if (!rc) _client.close(); + return rc; +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PubSubClient/PubSubClient.h Fri Apr 03 08:29:43 2015 +0000 @@ -0,0 +1,76 @@ +/* +PubSubClient.h - A simple client for MQTT. +Nicholas O'Leary +http://knolleary.net +*/ +#include "mbed.h" +#include "GPRSInterface.h" +#include "TCPSocketConnection.h" + +#ifndef PubSubClient_h +#define PubSubClient_h + + + +// MQTT_MAX_PACKET_SIZE : Maximum packet size +#define MQTT_MAX_PACKET_SIZE 128 + +// MQTT_KEEPALIVE : keepAlive interval in Seconds +#define MQTT_KEEPALIVE 15 + +#define MQTTPROTOCOLVERSION 3 +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +class PubSubClient { +private: + TCPSocketConnection _client; + char buffer[MQTT_MAX_PACKET_SIZE]; + int nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + void (*callback)(char*,char*,unsigned int); + int readPacket(int); + char readByte(); + bool write(short header, char* buf, int length); + int writeString(char* string, char* buf, int pos); + char* ip; + int port; +public: + PubSubClient(); + PubSubClient(char*, int, void(*)(char*,char*,unsigned int)); + bool connect(char *); + bool connect(char *, char *, char *); + bool connect(char *, char *, short, short, char *); + bool connect(char *, char *, char *, char *, short, short, char*); + void disconnect(); + bool publish(char *, char *); + bool publish(char *, char *, unsigned int); + bool publish(char *, char *, unsigned int, bool); +// bool publish_P(char *, short PROGMEM *, unsigned int, bool); + bool subscribe(char *); + bool unsubscribe(char *); + bool loop(); + bool connected(); +}; + + +#endif \ No newline at end of file
--- a/USBDevice.lib Fri Mar 06 09:22:41 2015 +0000 +++ b/USBDevice.lib Fri Apr 03 08:29:43 2015 +0000 @@ -1,1 +1,1 @@ -http://mbed.org/users/mbed_official/code/USBDevice/#4f589e246b9e +http://mbed.org/users/mbed_official/code/USBDevice/#0f216c4e75e5
--- a/main.cpp Fri Mar 06 09:22:41 2015 +0000 +++ b/main.cpp Fri Apr 03 08:29:43 2015 +0000 @@ -1,43 +1,57 @@ + +/* Copyright (c) 2010-2011 mbed.org, MIT License +* +* Permission is hereby granted, free of charge, to any person obtaining a copy of this software +* and associated documentation files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, copy, modify, merge, publish, +* distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all copies or +* substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + #include "mbed.h" #include "GPRSInterface.h" -#include "HTTPClient.h" - -#ifdef TARGET_ARCH_GPRS +#include "PubSubClient.h" #include "USBSerial.h" -#define LOG(args...) pc.printf(args) -USBSerial pc; -#else -#define LOG(args...) pc.printf(args) -Serial pc(USBTX, USBRX); -#endif + +#define NETWORK_APN "" -#define TEST_HTTP_GET 1 -#define TEST_HTTP_POST 1 -#define TEST_HTTP_PUT 1 -#define TEST_HTTP_DELETE 1 - -#define PIN_TX P1_27 -#define PIN_RX P1_26 +#define PIN_PWR P1_2 //power up gprs module +#define PIN_PWR_KEY P1_7 +#define PIN_TX P1_27 //Serial tx pin +#define PIN_RX P1_26 //Serial rx pin -GPRSInterface gprs(PIN_TX, PIN_RX, 115200, "uninet", NULL, NULL); -HTTPClient http; -char str[1024]; +DigitalOut power(PIN_PWR); +DigitalOut powerKey(PIN_PWR_KEY); -#ifdef TARGET_ARCH_GPRS -DigitalOut power(P1_2); -DigitalOut powerKey(P1_7); +USBSerial pc; + +//Serial uart(USBTX, USBRX); +GPRSInterface eth(PIN_TX, PIN_RX, NETWORK_APN, "", ""); -/* power pin: low enable - ___ - |___ +char* serverIpAddr = "opensensors.io"; /*Sever ip address*/ +int port = 1883; /*Sever Port*/ +void callback(char* topic, char* payload, unsigned int len); /*Callback function prototype*/ +PubSubClient mqtt(serverIpAddr, port, callback); - powerKey pin: you can also power up by long press the powerKey. +void callback(char* topic, char* payload, unsigned int len) +{ + pc.printf("Topic:%s\r\n", topic); + pc.printf("Payload:%s\r\n\r\n", payload); - ___ - ___| |___ + //Send incoming payloads back to topic "/mbed". + mqtt.publish("/users/yihui/message", payload, len); +} -*/ void gprsPowerUp(void) { power = 1; @@ -52,78 +66,59 @@ powerKey = 0; wait(3); } -#endif int main() { -#ifdef TARGET_ARCH_GPRS + pc.printf("Powering up!"); gprsPowerUp(); -#endif - gprs.init(); - - while(false == gprs.connect()) { - LOG("gprs connect error\n"); - wait(2); + wait(10); + + // Initialize the interface. + int s = eth.init(); + if (s != NULL) { + pc.printf(">>> Could not initialise. Halting!\n"); + exit(0); } - // successful DHCP - LOG("IP Address is %s\n", gprs.getIPAddress()); - - int ret; - HTTPMap map; - HTTPText inText(str, 1024); - HTTPText outText(str); - -#if TEST_HTTP_GET - //GET data - LOG("\nTrying to fetch page...\n"); - ret = http.get("http://developer.mbed.org/media/uploads/mbed_official/hello.txt", str, 1024); - if (!ret) { - LOG("Page fetched successfully - read %d characters\n", strlen(str)); - LOG("Result: %s\n", str); - } else { - LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode()); - } -#endif + pc.printf(">>> Get IP address...\n"); + while (1) { + s = eth.connect(); // Connect to network -#if TEST_HTTP_POST - //POST data - map.put("Hello", "World"); - map.put("test", "1234"); - LOG("\nTrying to post data...\n"); - ret = http.post("http://httpbin.org/post", map, &inText); - if (!ret) { - LOG("Executed POST successfully\n"); - } else { - LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode()); + if (s == false || s < 0) { + pc.printf(">>> Could not connect to network. Retrying!\n"); + wait(3); + } else { + break; + } } -#endif - -#if TEST_HTTP_PUT - //PUT data - strcpy(str, "This is a PUT test!"); - LOG("\nTrying to put resource...\n"); - ret = http.put("http://httpbin.org/put", outText, &inText); - if (!ret) { - LOG("Executed PUT successfully - read %d characters\n", strlen(str)); - LOG("Result: %s\n", str); - } else { - LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode()); + pc.printf(">>> Got IP address: %s\n", eth.getIPAddress()); + + char clientID[] = "1095"; /*Client nanme show for MQTT server*/ + char user[] = "yihui"; + char password[] = "cLkFPjQa"; + char pub_topic[] = "/users/yihui/test"; /*Publish to topic : "/users/kinan/test" */ + char sub_topic[] = "/users/yihui/test"; /*Subscribe to topic : "/users/kinan/test" */ + + +mqtt_connect: + while (!mqtt.connect(clientID, user, password)){ + pc.printf("\r\nConnect to server failed ..\r\n"); + pc.printf("wait 3 seconds. Retrying\n"); + wait(3); } -#endif - -#if TEST_HTTP_DELETE - //DELETE data - LOG("\nTrying to delete resource...\n"); - ret = http.del("http://httpbin.org/delete", &inText); - if (!ret) { - LOG("Executed DELETE successfully\n"); - } else { - LOG("Error - ret = %d - HTTP return code = %d\n", ret, http.getHTTPResponseCode()); + pc.printf("\r\nConnect to server sucessed ..\r\n"); + + mqtt.publish(pub_topic, "Hello from Arch GPRS"); + mqtt.subscribe(sub_topic); + + while(1) { + if (!mqtt.loop()) { + goto mqtt_connect; + } } -#endif - - gprs.disconnect(); - + + // Disconnect from network + //eth.disconnect(); + return 0; }
--- a/mbed.bld Fri Mar 06 09:22:41 2015 +0000 +++ b/mbed.bld Fri Apr 03 08:29:43 2015 +0000 @@ -1,1 +1,1 @@ -http://mbed.org/users/mbed_official/code/mbed/builds/7e07b6fb45cf \ No newline at end of file +http://mbed.org/users/mbed_official/code/mbed/builds/487b796308b0 \ No newline at end of file