Official reference client implementation for Cumulocity SmartREST on u-blox C027.
Dependencies: C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed
Fork of MbedSmartRestMain by
Diff: operation/OperationSupport.cpp
- Revision:
- 63:010bbbb4732a
- Parent:
- 62:86a04c5bda18
- Child:
- 65:a62dbef2f924
--- a/operation/OperationSupport.cpp Wed Oct 29 21:09:29 2014 +0000 +++ b/operation/OperationSupport.cpp Thu Oct 30 11:58:03 2014 +0000 @@ -1,5 +1,6 @@ #include "OperationSupport.h" #include <string.h> +#include "Aggregator.h" #include "ComposedRecord.h" #include "CharValue.h" #include "IntegerValue.h" @@ -15,7 +16,8 @@ _tpl(tpl), _deviceId(deviceId), _executor(client, tpl, deviceId), - _thread(OperationSupport::thread_func, this) + _thread1(OperationSupport::thread1_func, this), + _thread2(OperationSupport::thread2_func, this) { _init = false; } @@ -44,6 +46,9 @@ // Response: 211,<OPERATION/ID>,<STATUS> if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n")) return false; + + if (!_executor.init()) + return false; _init = true; return true; @@ -51,9 +56,18 @@ bool OperationSupport::run() { + if (!_store.hasPending()) { + if (!requestPendingOperations()) + return false; + } + + return true; +} + +bool OperationSupport::requestPendingOperations() +{ uint8_t ret; - OperationStore::Operation ops[100]; - size_t nops; + OperationStore::Operation op; ComposedRecord record; ParsedRecord received; @@ -63,17 +77,21 @@ if ((!record.add(msgId)) || (!record.add(deviceId))) return false; - puts("Operation support."); if (_client.send(record) != SMARTREST_SUCCESS) { _client.stop(); return false; } - nops = 0; while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) { - puts("Received operation."); - if (!operationFromRecord(received, ops[nops++])) + if (!operationFromRecord(received, op)) { puts("Operation conversion failed."); + continue; + } + if (!_store.enqueue(op)) { + puts("Cannot enqueue operation."); + continue; + } + puts("Enqueued operation."); } _client.stop(); @@ -83,12 +101,6 @@ return false; } - for (size_t i = 0; i < nops; i++) { - ops[i].state = OPERATION_SUCCESSFUL; - if (!updateOperation(ops[i])) - puts("Operation update failed."); - } - return true; } @@ -164,17 +176,71 @@ } } -void OperationSupport::thread() +void OperationSupport::thread1() { + OperationStore::Operation op; + bool ret; + + while (!_init) + Thread::yield(); + + while (true) { + if (!_store.takePending(op)) { + Thread::yield(); + continue; + } + + updateOperation(op); + ret = _executor.executeOperation(op); + _store.markAsDone(op, ret); + } +} + +void OperationSupport::thread2() +{ + OperationStore::Operation op; + Aggregator aggr(true); + while (!_init) Thread::yield(); + while (true) { + while ((!aggr.full()) && (_store.takeDone(op))) { + ComposedRecord record; + IntegerValue msgId(111); + IntegerValue operationId(op.identifier); + if ((!record.add(msgId)) || (!record.add(operationId)) || + (!record.add(operationStateValue(op)))) + break; + + if (!aggr.add(record)) + break; + } + + if (aggr.length() == 0) { + Thread::yield(); + continue; + } + + if (_client.send(aggr) != SMARTREST_SUCCESS) { + puts("Failed to update."); + } + _client.stop(); + aggr.clear(); + } } -void OperationSupport::thread_func(void const *arg) +void OperationSupport::thread1_func(void const *arg) { OperationSupport *that; that = (OperationSupport*)arg; - that->thread(); + that->thread1(); } + +void OperationSupport::thread2_func(void const *arg) +{ + OperationSupport *that; + that = (OperationSupport*)arg; + that->thread2(); +}