MBED MQTT Lighting Endpoint for NXP LPC1768+AppBoard
Dependencies: C12832_lcd EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed
Revision 129:c4fa24308e33, committed 2014-03-16
- Comitter:
- ansond
- Date:
- Sun Mar 16 17:30:59 2014 +0000
- Parent:
- 128:5b7f7bd73117
- Child:
- 130:9c52e163e733
- Commit message:
- updates
Changed in this revision
--- a/Definitions.h Sun Mar 16 06:16:04 2014 +0000 +++ b/Definitions.h Sun Mar 16 17:30:59 2014 +0000 @@ -101,6 +101,9 @@ // Index for Transport to use for Update/Load #define LOAD_TRANSPORT 1 // 0 - MQTT, 1 - HTTP +// largest MQTT message that PubSubClient will send +#define MAX_MQTT_MESSAGE_LENGTH 128 + // MQTT connect information #define MQTT_HOSTNAME "iocana.bcu.ac.uk" // IOC MQTT Broker Host - iocana.bcu.ac.uk #define MQTT_HOSTPORT 1883 // IOC MQTT Broker Port @@ -109,10 +112,14 @@ #define MQTT_ENDPOINT_IDLEN 64 // IOC MQTT Endpoint ID length (max) #define MQTT_IOC_TOPIC "ARM/sensinode/control/%s" // IOC MQTT Topic #define MQTT_IOC_ANNOUNCE_TOPIC "ARM/sensinode/control/all" // IOC MQTT Topic (since MQTT on MBED cannot seem to handle wildcards) +#define MQTT_PING_TOPIC "ARM/mqtt/ping" // GW-Node Ping/Pong topic (to keep this endpoint MQTT connection alive) #define MQTT_IOC_ALL_ENDPOINT "all" // must be the same as the last element of MATT_IOC_ANNOUNCE_TOPIC #define MQTT_IOC_TOPIC_LEN 64 // max length for the topic string #define MQTT_PAYLOAD_SEGMENT_LEN 64 // max length for a segment of the payload #define MQTT_USERNAME "" // IOC MQTT Username #define MQTT_PASSWORD "" // IOC MQTT Password +#define MQTT_PING_VERB_LEN 10 // Ping or Pong +#define MQTT_PING_COUNTDOWN 1200 // every 1200 250ms iterations (5 minutes) +#define MQTT_MAX_COUNTER 32768 // largest Ping counter before reset back to 1 #endif // _DEFINITIONS_H \ No newline at end of file
--- a/MQTTTransport.cpp Sun Mar 16 06:16:04 2014 +0000 +++ b/MQTTTransport.cpp Sun Mar 16 17:30:59 2014 +0000 @@ -29,7 +29,10 @@ // MQTT callback to handle received messages void _mqtt_message_handler(char *topic,char *payload,unsigned int length) { - if (_mqtt_instance != NULL) _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length); + if (_mqtt_instance != NULL) { + if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length); + else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length); + } } // our MQTT client endpoint @@ -40,6 +43,8 @@ this->m_mqtt = NULL; _mqtt_instance = this; this->m_map = map; + this->m_ping_counter = 1; + this->m_ping_countdown = MQTT_PING_COUNTDOWN; this->initTopic(); } @@ -256,6 +261,79 @@ return buffer; } + // is this message a PONG message? + bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) { + bool isPong = false; + char verb[MQTT_PING_VERB_LEN+1]; + char ep_name[LIGHT_NAME_LEN+1]; + int counter = 0; + + // clean + memset(verb,0,MQTT_PING_VERB_LEN+1); + memset(ep_name,0,LIGHT_NAME_LEN+1); + + // make sure we have the right topic + if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) { + // parse the payload + for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' '; + sscanf(payload,"%s %s %d",verb,ep_name,&counter); + + // check the contents to make sure its for us... + if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) { + // its a PONG message to our PING... + isPong = true; + } + } + + // return isPong status + return isPong; + } + + + // process this PONG message + void MQTTTransport::processPongMessage(char *payload,int payload_length) { + // DEBUG + this->logger()->log("Received PONG: counter=%d",this->m_ping_counter); + + // simply increment the counter + ++this->m_ping_counter; + + // reset counter if maxed + if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1; + } + + // send a PING message + bool MQTTTransport::sendPingMessage() { + bool sent = false; + char message[MAX_MQTT_MESSAGE_LENGTH+1]; + + // initialize... + memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); + + // build message + sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter); + + // send the message over the ping/pong topic + sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message)); + if (sent) { + // send succeeded + this->logger()->log("PING %d sent successfully",this->m_ping_counter); + this->logger()->blinkTransportTxLED(); + } + else { + // send failed! - reconnect + this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter); + + // attempt reconnect + this->disconnect(); + sent = this->connect(); + if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter); + } + + // return our status + return sent; + } + // connect up MQTT bool MQTTTransport::connect() { char mqtt_id[MQTT_ENDPOINT_IDLEN+1]; @@ -270,17 +348,24 @@ this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic()); if (this->m_mqtt->subscribe(this->getTopic())) { if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) { - this->logger()->log("MQTT CONNECTED."); - this->m_connected = true; + if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) { + this->logger()->log("MQTT CONNECTED."); + this->m_connected = true; + } + else { + this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED",MQTT_PING_TOPIC); + this->logger()->turnLEDRed(); + this->m_connected = false; + } } else { - this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",this->getTopic()); + this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC); this->logger()->turnLEDRed(); this->m_connected = false; } } else { - this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic()); + this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic()); this->logger()->turnLEDRed(); this->m_connected = false; } @@ -308,7 +393,10 @@ if (this->m_mqtt != NULL) { this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic()); this->m_mqtt->unsubscribe(this->getTopic()); + this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC); this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC); + this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC); + this->m_mqtt->unsubscribe(MQTT_PING_TOPIC); this->logger()->log("MQTT Disconnecting..."); this->m_mqtt->disconnect(); } @@ -321,8 +409,17 @@ // check transport and process stuff void MQTTTransport::checkAndProcess() { + // process any MQTT messages if (this->m_mqtt != NULL && this->m_connected == true) { this->m_mqtt->loop(); this->logger()->blinkTransportRxLED(); } + + // send a PING if time for it + --this->m_ping_countdown; + if (this->m_ping_countdown <= 0) { + this->logger()->log("MQTT: Sending PING..."); + this->m_ping_countdown = MQTT_PING_COUNTDOWN; + this->sendPingMessage(); + } } \ No newline at end of file
--- a/MQTTTransport.h Sun Mar 16 06:16:04 2014 +0000 +++ b/MQTTTransport.h Sun Mar 16 17:30:59 2014 +0000 @@ -28,15 +28,14 @@ // MBED to IOC Resource Map #include "MBEDToIOCResourceMap.h" -// largest MQTT message that PubSubClient will send -#define MAX_MQTT_MESSAGE_LENGTH 128 - class MQTTTransport : public Transport { private: PubSubClient *m_mqtt; MBEDToIOCResourceMap *m_map; char m_topic[MQTT_IOC_TOPIC_LEN+1]; char m_endpoint_name[LIGHT_NAME_LEN+1]; + int m_ping_counter; + int m_ping_countdown; public: MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map); @@ -51,6 +50,9 @@ char *getEndpointNameFromTopic(char *topic); + bool isPongMessage(char *topic,char *payload,int payload_length); + void processPongMessage(char *payload,int payload_length); + private: char *makeID(char *id_template, char *buffer); void initTopic(); @@ -58,6 +60,8 @@ char *mapIOCResourceToEndpointResource(char *ioc_name); void sendResult(char *endpoint_name,char *parameter_name,char *value,bool success); MBEDToIOCResourceMap *getMap(); + + bool sendPingMessage(); }; #endif // ___MQTTTRANSPORT_H_ \ No newline at end of file
--- a/endpoint_core.lib Sun Mar 16 06:16:04 2014 +0000 +++ b/endpoint_core.lib Sun Mar 16 17:30:59 2014 +0000 @@ -1,1 +1,1 @@ -http://mbed.org/users/ansond/code/endpoint_core/#673b2f43dfe7 +http://mbed.org/users/ansond/code/endpoint_core/#c34e2cf341a4