MBED MQTT Lighting Endpoint for NXP LPC1768+AppBoard

Dependencies:   C12832_lcd EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

Files at this revision

API Documentation at this revision

Comitter:
ansond
Date:
Sun Mar 16 17:30:59 2014 +0000
Parent:
128:5b7f7bd73117
Child:
130:9c52e163e733
Commit message:
updates

Changed in this revision

Definitions.h Show annotated file Show diff for this revision Revisions of this file
MQTTTransport.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTTransport.h Show annotated file Show diff for this revision Revisions of this file
endpoint_core.lib Show annotated file Show diff for this revision Revisions of this file
--- a/Definitions.h	Sun Mar 16 06:16:04 2014 +0000
+++ b/Definitions.h	Sun Mar 16 17:30:59 2014 +0000
@@ -101,6 +101,9 @@
 // Index for Transport to use for Update/Load
 #define LOAD_TRANSPORT              1                               // 0 - MQTT, 1 - HTTP
 
+// largest MQTT message that PubSubClient will send
+#define MAX_MQTT_MESSAGE_LENGTH     128
+
 // MQTT connect information
 #define MQTT_HOSTNAME               "iocana.bcu.ac.uk"              // IOC MQTT Broker Host - iocana.bcu.ac.uk
 #define MQTT_HOSTPORT               1883                            // IOC MQTT Broker Port
@@ -109,10 +112,14 @@
 #define MQTT_ENDPOINT_IDLEN         64                              // IOC MQTT Endpoint ID length (max)
 #define MQTT_IOC_TOPIC              "ARM/sensinode/control/%s"      // IOC MQTT Topic 
 #define MQTT_IOC_ANNOUNCE_TOPIC     "ARM/sensinode/control/all"     // IOC MQTT Topic (since MQTT on MBED cannot seem to handle wildcards)
+#define MQTT_PING_TOPIC             "ARM/mqtt/ping"                 // GW-Node Ping/Pong topic (to keep this endpoint MQTT connection alive)
 #define MQTT_IOC_ALL_ENDPOINT       "all"                           // must be the same as the last element of MATT_IOC_ANNOUNCE_TOPIC
 #define MQTT_IOC_TOPIC_LEN          64                              // max length for the topic string
 #define MQTT_PAYLOAD_SEGMENT_LEN    64                              // max length for a segment of the payload
 #define MQTT_USERNAME               ""                              // IOC MQTT Username
 #define MQTT_PASSWORD               ""                              // IOC MQTT Password
+#define MQTT_PING_VERB_LEN          10                              // Ping or Pong
+#define MQTT_PING_COUNTDOWN         1200                            // every 1200 250ms iterations (5 minutes)
+#define MQTT_MAX_COUNTER            32768                           // largest Ping counter before reset back to 1
 
 #endif // _DEFINITIONS_H
\ No newline at end of file
--- a/MQTTTransport.cpp	Sun Mar 16 06:16:04 2014 +0000
+++ b/MQTTTransport.cpp	Sun Mar 16 17:30:59 2014 +0000
@@ -29,7 +29,10 @@
 
  // MQTT callback to handle received messages
  void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
-    if (_mqtt_instance != NULL) _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+    if (_mqtt_instance != NULL) {
+        if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length);
+        else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+    }
  }
  
  // our MQTT client endpoint
@@ -40,6 +43,8 @@
      this->m_mqtt = NULL;
      _mqtt_instance = this;
      this->m_map = map;
+     this->m_ping_counter = 1;
+     this->m_ping_countdown = MQTT_PING_COUNTDOWN;
      this->initTopic();
  }
  
@@ -256,6 +261,79 @@
      return buffer;
  }
  
+ // is this message a PONG message?
+ bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
+     bool isPong = false;
+     char verb[MQTT_PING_VERB_LEN+1];
+     char ep_name[LIGHT_NAME_LEN+1];
+     int counter = 0;
+     
+     // clean
+     memset(verb,0,MQTT_PING_VERB_LEN+1);
+     memset(ep_name,0,LIGHT_NAME_LEN+1);
+     
+     // make sure we have the right topic
+     if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) {
+         // parse the payload
+         for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
+         sscanf(payload,"%s %s %d",verb,ep_name,&counter);
+         
+         // check the contents to make sure its for us...
+         if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) {     
+             // its a PONG message to our PING... 
+             isPong = true;
+         }
+     }
+     
+     // return isPong status
+     return isPong;
+ }
+ 
+ 
+ // process this PONG message
+ void MQTTTransport::processPongMessage(char *payload,int payload_length) {
+     // DEBUG
+     this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
+             
+     // simply increment the counter
+     ++this->m_ping_counter;
+     
+     // reset counter if maxed 
+     if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
+ }
+ 
+ // send a PING message
+ bool MQTTTransport::sendPingMessage() {
+     bool sent = false;
+     char message[MAX_MQTT_MESSAGE_LENGTH+1];
+     
+     // initialize...
+     memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
+     
+     // build message
+     sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter);
+     
+     // send the message over the ping/pong topic
+    sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message));
+     if (sent) {
+         // send succeeded
+         this->logger()->log("PING %d sent successfully",this->m_ping_counter);
+         this->logger()->blinkTransportTxLED();
+     }
+     else {
+         // send failed! - reconnect
+         this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
+         
+         // attempt reconnect
+         this->disconnect();
+         sent = this->connect();
+         if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
+     }
+     
+     // return our status
+     return sent;
+ }
+ 
  // connect up MQTT
  bool MQTTTransport::connect() {
      char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
@@ -270,17 +348,24 @@
                  this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
                  if (this->m_mqtt->subscribe(this->getTopic())) {
                     if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
-                        this->logger()->log("MQTT CONNECTED.");
-                        this->m_connected = true;
+                        if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) {
+                            this->logger()->log("MQTT CONNECTED.");
+                            this->m_connected = true;
+                        }
+                        else {
+                            this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED",MQTT_PING_TOPIC);
+                            this->logger()->turnLEDRed();
+                            this->m_connected = false;
+                        }
                     }
                     else {
-                        this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",this->getTopic());
+                        this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC);
                         this->logger()->turnLEDRed();
                         this->m_connected = false;
                     }
                  }
                  else {
-                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
+                     this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic());
                      this->logger()->turnLEDRed();
                      this->m_connected = false;
                  }
@@ -308,7 +393,10 @@
      if (this->m_mqtt != NULL) {
          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
          this->m_mqtt->unsubscribe(this->getTopic());
+         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC);
          this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
+         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC);
+         this->m_mqtt->unsubscribe(MQTT_PING_TOPIC);
          this->logger()->log("MQTT Disconnecting...");
          this->m_mqtt->disconnect();
      }
@@ -321,8 +409,17 @@
  
  // check transport and process stuff
  void MQTTTransport::checkAndProcess() {
+     // process any MQTT messages
      if (this->m_mqtt != NULL && this->m_connected == true) {
          this->m_mqtt->loop();
          this->logger()->blinkTransportRxLED();
      }
+     
+     // send a PING if time for it
+     --this->m_ping_countdown;
+     if (this->m_ping_countdown <= 0) {
+         this->logger()->log("MQTT: Sending PING...");
+         this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+         this->sendPingMessage();
+     }
  }
\ No newline at end of file
--- a/MQTTTransport.h	Sun Mar 16 06:16:04 2014 +0000
+++ b/MQTTTransport.h	Sun Mar 16 17:30:59 2014 +0000
@@ -28,15 +28,14 @@
 // MBED to IOC Resource Map
 #include "MBEDToIOCResourceMap.h"
 
-// largest MQTT message that PubSubClient will send
-#define MAX_MQTT_MESSAGE_LENGTH         128
-
 class MQTTTransport : public Transport {
     private:
         PubSubClient         *m_mqtt;
         MBEDToIOCResourceMap *m_map;
         char                  m_topic[MQTT_IOC_TOPIC_LEN+1];
         char                  m_endpoint_name[LIGHT_NAME_LEN+1];
+        int                   m_ping_counter;
+        int                   m_ping_countdown;
         
     public:
         MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map);
@@ -51,6 +50,9 @@
 
         char *getEndpointNameFromTopic(char *topic);
         
+        bool isPongMessage(char *topic,char *payload,int payload_length);
+        void processPongMessage(char *payload,int payload_length);
+        
     private:
         char *makeID(char *id_template, char *buffer);
         void initTopic();
@@ -58,6 +60,8 @@
         char *mapIOCResourceToEndpointResource(char *ioc_name);
         void sendResult(char *endpoint_name,char *parameter_name,char *value,bool success);
         MBEDToIOCResourceMap *getMap();
+        
+        bool sendPingMessage();
 };
 
 #endif // ___MQTTTRANSPORT_H_
\ No newline at end of file
--- a/endpoint_core.lib	Sun Mar 16 06:16:04 2014 +0000
+++ b/endpoint_core.lib	Sun Mar 16 17:30:59 2014 +0000
@@ -1,1 +1,1 @@
-http://mbed.org/users/ansond/code/endpoint_core/#673b2f43dfe7
+http://mbed.org/users/ansond/code/endpoint_core/#c34e2cf341a4