RTOs con capa MQTT

Dependencies:   ADS1015 EthernetInterface MQTT mbed-rtos mbed

main.cpp

Committer:
ramirobmar
Date:
2015-08-24
Revision:
0:5c786957b6f8

File content as of revision 0:5c786957b6f8:

#include "mbed.h"
#include "rtos.h"
#include "Adafruit_ADS1015.h"
#include "EthernetInterface.h"
#include "MQTTPacket.h"

#include <errno.h>
#include <stdlib.h>
#include <string.h>


#define LEN_BUFFER 100
#define RES_ADC 1
#define ID_SN_DEVICE 0x01
#define READ_SIGNAL 1

uint32_t measurement;
uint32_t mean_value;
uint16_t gain;
uint32_t offset;
uint32_t * buffer;
uint8_t counter;
Thread * thread1;
Mutex * lock_buffer;
Semaphore * sem_adc;
Thread * thread2;
Adafruit_ADS1015 * m_adc;

EthernetInterface eth;
TCPSocketConnection mysock; 
char msg_err[81];
char msg_control[81];
char msg_status[81];
int toStop = 0;

int publish();
    
void set_values(void const *argument)
{
    uint8_t i;
    uint64_t aux_value;
    
    while (true) {
        
        lock_buffer->lock(osWaitForever);
        
        for(i=0;i<counter;i++)
        {
            aux_value = aux_value +  buffer[i];  
        }
        mean_value = (uint32_t) aux_value / counter;
        
        lock_buffer->unlock();
        publish();
        
        //Thread::signal_wait(READ_SIGNAL);
        Thread::wait(10);
    }
    
}



void system_measurements_thread(void const *argument)
{
    uint16_t measurement;
    uint32_t data_in;
    
    while (true) {
        
        
        sem_adc->wait(osWaitForever);
        measurement=m_adc->readADC_Differential_0_1(); 
        sem_adc->release();
        
        data_in = measurement * gain - offset;
        
        lock_buffer->lock(osWaitForever);
        buffer[counter++]=data_in;
        if(counter == LEN_BUFFER) counter = 0;
        lock_buffer->unlock();
               
        Thread::wait(10);
          
    }   
}

void init_system_measurements()
{
    m_adc=new Adafruit_ADS1015(0, ADS1015_ADDRESS); // set i2c adress = 0 to allow ADS1115 to use this as default constructor
    m_adc->setGain(GAIN_ONE);
    buffer = new uint32_t[LEN_BUFFER];
    counter=0;
    lock_buffer = new Mutex();
    sem_adc = new Semaphore(RES_ADC);
}

int getdata(unsigned char* buf, int count)
{
    return mysock.receive((char *)buf, (size_t)count);
}

void init_system_network()
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    MQTTString topicString = MQTTString_initializer;
   
    unsigned char buf[200];
    int buflen = sizeof(buf);
       
    unsigned char payload[25]= "mypayload";
    int payloadlen = strlen((char *)payload);
    bool status_err = false;
    
    int len = 0;
    int connack_rc;
    unsigned char sessionPresent;
    
    unsigned char dup;
    int qos;
    unsigned char retained;
    unsigned short msgid;
    int payloadlen_in;
    char* payload_in;
    int rc;       
    
    
    eth.init(); //Use DHCP
    eth.connect();  
    
    
    rc=mysock.connect("iot.eclipse.org", 1883); 
    if(rc==-1)
    {
        eth.disconnect();
        return;
    }
    
    data.clientID.cstring = "SendReceive mbed MQTT ";
    data.keepAliveInterval = 20;
    data.cleansession = 1;
    
    mysock.set_blocking(true, 1000);  /* 1 second Timeout */
    
    len = MQTTSerialize_connect(buf, buflen, &data);
    rc = mysock.send((char *)buf, len);
    if(rc==-1)
    {
        eth.disconnect();
        return;
    
    }

    /* wait for connack */
    if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
    {
        
       if (MQTTDeserialize_connack((unsigned char*)&sessionPresent,(unsigned char*)&connack_rc, (unsigned char*)buf, (int)buflen) != 1 || connack_rc != 0)
        {
            sprintf(msg_err,"Unable to connect, return code %d",connack_rc);
            status_err = true;
        }
        else
           status_err = false; 
    }
    else
        status_err = true;
        
    if (!status_err)
    {
        topicString.cstring = "pubtopic";    
        
        while (!toStop)
        {
            if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
            {
                
                MQTTString receivedTopic;
                rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,&receivedTopic,(unsigned char **)&payload_in,&payloadlen_in, buf, buflen);
                sprintf(msg_control,"message arrived %.*s\n", payloadlen_in, payload_in);
            }

            sprintf(msg_status,"publishing reading\n");
            len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
            rc = mysock.send((char *)buf, len);
            
        }

    
    }
    else{
        eth.disconnect();
    }
    
    return;
        
        
    

}

int publish()
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    int rc = 0;
    unsigned char buf[200];
    int buflen = sizeof(buf);
    TCPSocketConnection mysock; 
    MQTTString topicString = MQTTString_initializer;
    unsigned char payload[25] = "I'm alive!";
    int payloadlen = strlen((char *)payload);
    int len = 0;
    
    mysock.connect("m2m.eclipse.org", 1883);
    
    sprintf(data.clientID.cstring,"ID:%d",ID_SN_DEVICE);
    data.keepAliveInterval = 20;
    data.cleansession = 1;
    data.MQTTVersion = 3;

    len = MQTTSerialize_connect(buf, buflen, &data);

    sprintf(topicString.cstring,"MESUREMENT:%d",mean_value);
    
    len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen);

    len += MQTTSerialize_disconnect(buf + len, buflen - len);

    rc = 0;
    while (rc < len)
    {
        int rc1 = mysock.send((char *)buf, len);
        if (rc1 == -1)
        {
            sprintf(msg_status,"Send failed");
            
            break;
        }
        else
            rc += rc1;
    }
    if (rc == len)
        sprintf(msg_status,"Send succeeded");
        
    wait(0.2);

    return 0;
}




int main()
{
    
    
    init_system_measurements(); // Initialization system's measurement  
    init_system_network();
    
    thread1 = new Thread(system_measurements_thread);
    thread2 = new Thread(set_values);
    
    
    while (true) {
        
    }

}