RTOs con capa MQTT

Dependencies:   ADS1015 EthernetInterface MQTT mbed-rtos mbed

Files at this revision

API Documentation at this revision

Comitter:
ramirobmar
Date:
Mon Aug 24 13:43:25 2015 +0000
Commit message:
Revision_01_01

Changed in this revision

ADS1015.lib Show annotated file Show diff for this revision Revisions of this file
EthernetInterface.lib Show annotated file Show diff for this revision Revisions of this file
MQTT.lib Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
mbed-rtos.lib Show annotated file Show diff for this revision Revisions of this file
mbed.bld Show annotated file Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ADS1015.lib	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/arve0/code/ADS1015/#aa277517f0ad
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/EthernetInterface.lib	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/mbed_official/code/EthernetInterface/#2fc406e2553f
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTT.lib	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/teams/mqtt/code/MQTT/#37f007d2a8ae
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main.cpp	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,268 @@
+#include "mbed.h"
+#include "rtos.h"
+#include "Adafruit_ADS1015.h"
+#include "EthernetInterface.h"
+#include "MQTTPacket.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+
+#define LEN_BUFFER 100
+#define RES_ADC 1
+#define ID_SN_DEVICE 0x01
+#define READ_SIGNAL 1
+
+uint32_t measurement;
+uint32_t mean_value;
+uint16_t gain;
+uint32_t offset;
+uint32_t * buffer;
+uint8_t counter;
+Thread * thread1;
+Mutex * lock_buffer;
+Semaphore * sem_adc;
+Thread * thread2;
+Adafruit_ADS1015 * m_adc;
+
+EthernetInterface eth;
+TCPSocketConnection mysock; 
+char msg_err[81];
+char msg_control[81];
+char msg_status[81];
+int toStop = 0;
+
+int publish();
+    
+void set_values(void const *argument)
+{
+    uint8_t i;
+    uint64_t aux_value;
+    
+    while (true) {
+        
+        lock_buffer->lock(osWaitForever);
+        
+        for(i=0;i<counter;i++)
+        {
+            aux_value = aux_value +  buffer[i];  
+        }
+        mean_value = (uint32_t) aux_value / counter;
+        
+        lock_buffer->unlock();
+        publish();
+        
+        //Thread::signal_wait(READ_SIGNAL);
+        Thread::wait(10);
+    }
+    
+}
+
+
+
+void system_measurements_thread(void const *argument)
+{
+    uint16_t measurement;
+    uint32_t data_in;
+    
+    while (true) {
+        
+        
+        sem_adc->wait(osWaitForever);
+        measurement=m_adc->readADC_Differential_0_1(); 
+        sem_adc->release();
+        
+        data_in = measurement * gain - offset;
+        
+        lock_buffer->lock(osWaitForever);
+        buffer[counter++]=data_in;
+        if(counter == LEN_BUFFER) counter = 0;
+        lock_buffer->unlock();
+               
+        Thread::wait(10);
+          
+    }   
+}
+
+void init_system_measurements()
+{
+    m_adc=new Adafruit_ADS1015(0, ADS1015_ADDRESS); // set i2c adress = 0 to allow ADS1115 to use this as default constructor
+    m_adc->setGain(GAIN_ONE);
+    buffer = new uint32_t[LEN_BUFFER];
+    counter=0;
+    lock_buffer = new Mutex();
+    sem_adc = new Semaphore(RES_ADC);
+}
+
+int getdata(unsigned char* buf, int count)
+{
+    return mysock.receive((char *)buf, (size_t)count);
+}
+
+void init_system_network()
+{
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+    MQTTString topicString = MQTTString_initializer;
+   
+    unsigned char buf[200];
+    int buflen = sizeof(buf);
+       
+    unsigned char payload[25]= "mypayload";
+    int payloadlen = strlen((char *)payload);
+    bool status_err = false;
+    
+    int len = 0;
+    int connack_rc;
+    unsigned char sessionPresent;
+    
+    unsigned char dup;
+    int qos;
+    unsigned char retained;
+    unsigned short msgid;
+    int payloadlen_in;
+    char* payload_in;
+    int rc;       
+    
+    
+    eth.init(); //Use DHCP
+    eth.connect();  
+    
+    
+    rc=mysock.connect("iot.eclipse.org", 1883); 
+    if(rc==-1)
+    {
+        eth.disconnect();
+        return;
+    }
+    
+    data.clientID.cstring = "SendReceive mbed MQTT ";
+    data.keepAliveInterval = 20;
+    data.cleansession = 1;
+    
+    mysock.set_blocking(true, 1000);  /* 1 second Timeout */
+    
+    len = MQTTSerialize_connect(buf, buflen, &data);
+    rc = mysock.send((char *)buf, len);
+    if(rc==-1)
+    {
+        eth.disconnect();
+        return;
+    
+    }
+
+    /* wait for connack */
+    if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
+    {
+        
+       if (MQTTDeserialize_connack((unsigned char*)&sessionPresent,(unsigned char*)&connack_rc, (unsigned char*)buf, (int)buflen) != 1 || connack_rc != 0)
+        {
+            sprintf(msg_err,"Unable to connect, return code %d",connack_rc);
+            status_err = true;
+        }
+        else
+           status_err = false; 
+    }
+    else
+        status_err = true;
+        
+    if (!status_err)
+    {
+        topicString.cstring = "pubtopic";    
+        
+        while (!toStop)
+        {
+            if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
+            {
+                
+                MQTTString receivedTopic;
+                rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,&receivedTopic,(unsigned char **)&payload_in,&payloadlen_in, buf, buflen);
+                sprintf(msg_control,"message arrived %.*s\n", payloadlen_in, payload_in);
+            }
+
+            sprintf(msg_status,"publishing reading\n");
+            len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
+            rc = mysock.send((char *)buf, len);
+            
+        }
+
+    
+    }
+    else{
+        eth.disconnect();
+    }
+    
+    return;
+        
+        
+    
+
+}
+
+int publish()
+{
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+    int rc = 0;
+    unsigned char buf[200];
+    int buflen = sizeof(buf);
+    TCPSocketConnection mysock; 
+    MQTTString topicString = MQTTString_initializer;
+    unsigned char payload[25] = "I'm alive!";
+    int payloadlen = strlen((char *)payload);
+    int len = 0;
+    
+    mysock.connect("m2m.eclipse.org", 1883);
+    
+    sprintf(data.clientID.cstring,"ID:%d",ID_SN_DEVICE);
+    data.keepAliveInterval = 20;
+    data.cleansession = 1;
+    data.MQTTVersion = 3;
+
+    len = MQTTSerialize_connect(buf, buflen, &data);
+
+    sprintf(topicString.cstring,"MESUREMENT:%d",mean_value);
+    
+    len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen);
+
+    len += MQTTSerialize_disconnect(buf + len, buflen - len);
+
+    rc = 0;
+    while (rc < len)
+    {
+        int rc1 = mysock.send((char *)buf, len);
+        if (rc1 == -1)
+        {
+            sprintf(msg_status,"Send failed");
+            
+            break;
+        }
+        else
+            rc += rc1;
+    }
+    if (rc == len)
+        sprintf(msg_status,"Send succeeded");
+        
+    wait(0.2);
+
+    return 0;
+}
+
+
+
+
+int main()
+{
+    
+    
+    init_system_measurements(); // Initialization system's measurement  
+    init_system_network();
+    
+    thread1 = new Thread(system_measurements_thread);
+    thread2 = new Thread(set_values);
+    
+    
+    while (true) {
+        
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mbed-rtos.lib	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/mbed_official/code/mbed-rtos/#21b438192b0f
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mbed.bld	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,1 @@
+http://mbed.org/users/mbed_official/code/mbed/builds/b9ad9a133dc7
\ No newline at end of file