RTOs con capa MQTT
Dependencies: ADS1015 EthernetInterface MQTT mbed-rtos mbed
main.cpp
- Committer:
- ramirobmar
- Date:
- 2015-08-24
- Revision:
- 0:5c786957b6f8
File content as of revision 0:5c786957b6f8:
#include "mbed.h" #include "rtos.h" #include "Adafruit_ADS1015.h" #include "EthernetInterface.h" #include "MQTTPacket.h" #include <errno.h> #include <stdlib.h> #include <string.h> #define LEN_BUFFER 100 #define RES_ADC 1 #define ID_SN_DEVICE 0x01 #define READ_SIGNAL 1 uint32_t measurement; uint32_t mean_value; uint16_t gain; uint32_t offset; uint32_t * buffer; uint8_t counter; Thread * thread1; Mutex * lock_buffer; Semaphore * sem_adc; Thread * thread2; Adafruit_ADS1015 * m_adc; EthernetInterface eth; TCPSocketConnection mysock; char msg_err[81]; char msg_control[81]; char msg_status[81]; int toStop = 0; int publish(); void set_values(void const *argument) { uint8_t i; uint64_t aux_value; while (true) { lock_buffer->lock(osWaitForever); for(i=0;i<counter;i++) { aux_value = aux_value + buffer[i]; } mean_value = (uint32_t) aux_value / counter; lock_buffer->unlock(); publish(); //Thread::signal_wait(READ_SIGNAL); Thread::wait(10); } } void system_measurements_thread(void const *argument) { uint16_t measurement; uint32_t data_in; while (true) { sem_adc->wait(osWaitForever); measurement=m_adc->readADC_Differential_0_1(); sem_adc->release(); data_in = measurement * gain - offset; lock_buffer->lock(osWaitForever); buffer[counter++]=data_in; if(counter == LEN_BUFFER) counter = 0; lock_buffer->unlock(); Thread::wait(10); } } void init_system_measurements() { m_adc=new Adafruit_ADS1015(0, ADS1015_ADDRESS); // set i2c adress = 0 to allow ADS1115 to use this as default constructor m_adc->setGain(GAIN_ONE); buffer = new uint32_t[LEN_BUFFER]; counter=0; lock_buffer = new Mutex(); sem_adc = new Semaphore(RES_ADC); } int getdata(unsigned char* buf, int count) { return mysock.receive((char *)buf, (size_t)count); } void init_system_network() { MQTTPacket_connectData data = MQTTPacket_connectData_initializer; MQTTString topicString = MQTTString_initializer; unsigned char buf[200]; int buflen = sizeof(buf); unsigned char payload[25]= "mypayload"; int payloadlen = strlen((char *)payload); bool status_err = false; int len = 0; int connack_rc; unsigned char sessionPresent; unsigned char dup; int qos; unsigned char retained; unsigned short msgid; int payloadlen_in; char* payload_in; int rc; eth.init(); //Use DHCP eth.connect(); rc=mysock.connect("iot.eclipse.org", 1883); if(rc==-1) { eth.disconnect(); return; } data.clientID.cstring = "SendReceive mbed MQTT "; data.keepAliveInterval = 20; data.cleansession = 1; mysock.set_blocking(true, 1000); /* 1 second Timeout */ len = MQTTSerialize_connect(buf, buflen, &data); rc = mysock.send((char *)buf, len); if(rc==-1) { eth.disconnect(); return; } /* wait for connack */ if (MQTTPacket_read(buf, buflen, getdata) == CONNACK) { if (MQTTDeserialize_connack((unsigned char*)&sessionPresent,(unsigned char*)&connack_rc, (unsigned char*)buf, (int)buflen) != 1 || connack_rc != 0) { sprintf(msg_err,"Unable to connect, return code %d",connack_rc); status_err = true; } else status_err = false; } else status_err = true; if (!status_err) { topicString.cstring = "pubtopic"; while (!toStop) { if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH) { MQTTString receivedTopic; rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,&receivedTopic,(unsigned char **)&payload_in,&payloadlen_in, buf, buflen); sprintf(msg_control,"message arrived %.*s\n", payloadlen_in, payload_in); } sprintf(msg_status,"publishing reading\n"); len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen); rc = mysock.send((char *)buf, len); } } else{ eth.disconnect(); } return; } int publish() { MQTTPacket_connectData data = MQTTPacket_connectData_initializer; int rc = 0; unsigned char buf[200]; int buflen = sizeof(buf); TCPSocketConnection mysock; MQTTString topicString = MQTTString_initializer; unsigned char payload[25] = "I'm alive!"; int payloadlen = strlen((char *)payload); int len = 0; mysock.connect("m2m.eclipse.org", 1883); sprintf(data.clientID.cstring,"ID:%d",ID_SN_DEVICE); data.keepAliveInterval = 20; data.cleansession = 1; data.MQTTVersion = 3; len = MQTTSerialize_connect(buf, buflen, &data); sprintf(topicString.cstring,"MESUREMENT:%d",mean_value); len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen); len += MQTTSerialize_disconnect(buf + len, buflen - len); rc = 0; while (rc < len) { int rc1 = mysock.send((char *)buf, len); if (rc1 == -1) { sprintf(msg_status,"Send failed"); break; } else rc += rc1; } if (rc == len) sprintf(msg_status,"Send succeeded"); wait(0.2); return 0; } int main() { init_system_measurements(); // Initialization system's measurement init_system_network(); thread1 = new Thread(system_measurements_thread); thread2 = new Thread(set_values); while (true) { } }