Official reference client implementation for Cumulocity SmartREST on u-blox C027.

Dependencies:   C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed

Fork of MbedSmartRestMain by Vincent Wochnik

Revision:
63:010bbbb4732a
Parent:
62:86a04c5bda18
Child:
65:a62dbef2f924
--- a/operation/OperationSupport.cpp	Wed Oct 29 21:09:29 2014 +0000
+++ b/operation/OperationSupport.cpp	Thu Oct 30 11:58:03 2014 +0000
@@ -1,5 +1,6 @@
 #include "OperationSupport.h"
 #include <string.h>
+#include "Aggregator.h"
 #include "ComposedRecord.h"
 #include "CharValue.h"
 #include "IntegerValue.h"
@@ -15,7 +16,8 @@
     _tpl(tpl),
     _deviceId(deviceId),
     _executor(client, tpl, deviceId),
-    _thread(OperationSupport::thread_func, this)
+    _thread1(OperationSupport::thread1_func, this),
+    _thread2(OperationSupport::thread2_func, this)
 {
     _init = false;
 }
@@ -44,6 +46,9 @@
     // Response: 211,<OPERATION/ID>,<STATUS>
     if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n"))
         return false;
+        
+    if (!_executor.init())
+        return false;
 
     _init = true;
     return true;
@@ -51,9 +56,18 @@
 
 bool OperationSupport::run()
 {
+    if (!_store.hasPending()) {
+        if (!requestPendingOperations())
+            return false;
+    }
+        
+    return true;
+}
+
+bool OperationSupport::requestPendingOperations()
+{
     uint8_t ret;
-    OperationStore::Operation ops[100];
-    size_t nops;
+    OperationStore::Operation op;
 
     ComposedRecord record;
     ParsedRecord received;
@@ -63,17 +77,21 @@
     if ((!record.add(msgId)) || (!record.add(deviceId)))
         return false;
 
-    puts("Operation support.");        
     if (_client.send(record) != SMARTREST_SUCCESS) {
         _client.stop();
         return false;
     }
     
-    nops = 0;
     while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) {
-        puts("Received operation.");
-        if (!operationFromRecord(received, ops[nops++]))
+        if (!operationFromRecord(received, op)) {
             puts("Operation conversion failed.");
+            continue;
+        }
+        if (!_store.enqueue(op)) {
+            puts("Cannot enqueue operation.");
+            continue;
+        }
+        puts("Enqueued operation.");
     }
     _client.stop();
 
@@ -83,12 +101,6 @@
         return false;
     }
     
-    for (size_t i = 0; i < nops; i++) {
-        ops[i].state = OPERATION_SUCCESSFUL;
-        if (!updateOperation(ops[i]))
-            puts("Operation update failed.");
-    }
-
     return true;
 }
 
@@ -164,17 +176,71 @@
     }
 }
 
-void OperationSupport::thread()
+void OperationSupport::thread1()
 {
+    OperationStore::Operation op;
+    bool ret;
+
+    while (!_init)
+        Thread::yield();
+    
+    while (true) {
+        if (!_store.takePending(op)) {
+            Thread::yield();
+            continue;
+        }
+        
+        updateOperation(op);
+        ret = _executor.executeOperation(op);
+        _store.markAsDone(op, ret);
+    }
+}
+
+void OperationSupport::thread2()
+{
+    OperationStore::Operation op;
+    Aggregator aggr(true);
+
     while (!_init)
         Thread::yield();
     
+    while (true) {
+        while ((!aggr.full()) && (_store.takeDone(op))) {
+            ComposedRecord record;
     
+            IntegerValue msgId(111);
+            IntegerValue operationId(op.identifier);
+            if ((!record.add(msgId)) || (!record.add(operationId)) ||
+                (!record.add(operationStateValue(op))))
+                break;
+            
+            if (!aggr.add(record))
+                break;
+        }
+        
+        if (aggr.length() == 0) {
+            Thread::yield();
+            continue;
+        }
+        
+        if (_client.send(aggr) != SMARTREST_SUCCESS) {
+            puts("Failed to update.");
+        }
+        _client.stop();
+        aggr.clear();        
+    }
 }
 
-void OperationSupport::thread_func(void const *arg)
+void OperationSupport::thread1_func(void const *arg)
 {
     OperationSupport *that;
     that = (OperationSupport*)arg;
-    that->thread();
+    that->thread1();
 }
+
+void OperationSupport::thread2_func(void const *arg)
+{
+    OperationSupport *that;
+    that = (OperationSupport*)arg;
+    that->thread2();
+}