ARM mbed M2X API Client: The ARM mbed client library is used to send/receive data to/from AT&T's M2X service from mbed LPC1768 microcontrollers.
Dependents: m2x-demo-all M2X_MTS_ACCEL_DEMO M2X_MTS_Accel M2X_K64F_ACCEL ... more
Revision 17:9db4a86b876a, committed 2015-12-28
- Comitter:
- citrusbyte
- Date:
- Mon Dec 28 13:06:27 2015 +0000
- Parent:
- 16:7903152de19f
- Child:
- 18:510895884768
- Commit message:
- Update comments to TimeService
Changed in this revision
M2XStreamClient.cpp | Show annotated file Show diff for this revision Revisions of this file |
--- a/M2XStreamClient.cpp Mon Dec 28 12:48:19 2015 +0000 +++ b/M2XStreamClient.cpp Mon Dec 28 13:06:27 2015 +0000 @@ -1,527 +1,147 @@ #include "M2XStreamClient.h" -#include <jsonlite.h> - -#include "StreamParseFunctions.h" -#include "LocationParseFunctions.h" - -const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com"; - -static int write_delete_values(Print* print, const char* from, const char* end); -int print_encoded_string(Print* print, const char* str); -int tolower(int ch); - -#if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM) -int tolower(int ch) -{ - // Arduino and mbed use ASCII table, so we can simplify the implementation - if ((ch >= 'A') && (ch <= 'Z')) { - return (ch + 32); - } - return ch; -} -#else -// For other platform, we use libc's tolower by default -#include <ctype.h> -#endif +static int fill_iso8601_timestamp(int32_t seconds, int32_t milli, + char* buffer, int* length); -M2XStreamClient::M2XStreamClient(Client* client, - const char* key, - int case_insensitive, - const char* host, - int port, - const char* path_prefix) : _client(client), - _key(key), - _case_insensitive(case_insensitive), - _host(host), - _port(port), - _path_prefix(path_prefix), - _null_print() { -} - -int M2XStreamClient::listStreamValues(const char* deviceId, const char* streamName, - stream_value_read_callback callback, void* context, - const char* query) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - _client->print("GET "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->print("/streams/"); - print_encoded_string(_client, streamName); - _client->print("/values.json"); - - if (query) { - if (query[0] != '?') { - _client->print('?'); - } - _client->print(query); - } - - _client->println(" HTTP/1.0"); - writeHttpHeader(-1); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - int status = readStatusCode(false); - if (status == 200) { - readStreamValue(callback, context); - } - - close(); - return status; +TimeService::TimeService(M2XStreamClient* client) : _client(client) { } -int M2XStreamClient::readLocation(const char* deviceId, - location_read_callback callback, - void* context) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - _client->print("GET "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->println("/location HTTP/1.0"); - - writeHttpHeader(-1); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - int status = readStatusCode(false); - if (status == 200) { - readLocation(callback, context); - } - - close(); - return status; -} - -int M2XStreamClient::deleteValues(const char* deviceId, const char* streamName, - const char* from, const char* end) { - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - int length = write_delete_values(&_null_print, from, end); - writeDeleteHeader(deviceId, streamName, length); - write_delete_values(_client, from, end); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; - } - - return readStatusCode(true); +int TimeService::init() { + _timer.start(); + return reset(); } -int M2XStreamClient::getTimestamp32(int32_t *ts) { - // The maximum value of signed 64-bit integer is 0x7fffffffffffffff, - // which is 9223372036854775807. It consists of 19 characters, so a - // buffer of 20 is definitely enough here - int length = 20; - char buffer[20]; - int status = getTimestamp(buffer, &length); - if (status == 200) { - int32_t result = 0; - for (int i = 0; i < length; i++) { - result = result * 10 + (buffer[i] - '0'); - } - if (ts != NULL) { *ts = result; } - } - return status; -} +int TimeService::reset() { + int32_t ts; + int status = _client->getTimestamp32(&ts); -int M2XStreamClient::getTimestamp(char* buffer, int *bufferLength) { - if (bufferLength == NULL) { return E_INVALID; } - if (_client->connect(_host, _port)) { - DBGLN("%s", "Connected to M2X server!"); - _client->println("GET /v2/time/seconds HTTP/1.0"); - - writeHttpHeader(-1); - } else { - DBGLN("%s", "ERROR: Cannot connect to M2X server!"); - return E_NOCONNECTION; + if (m2x_status_is_success(status)) { + _server_timestamp = ts; + _local_last_milli = _timer.read_ms(); } - int status = readStatusCode(false); - if (status == 200) { - int length = readContentLength(); - if (length < 0) { - close(); - return length; - } - if (*bufferLength < length) { - *bufferLength = length; - return E_BUFFER_TOO_SMALL; - } - *bufferLength = length; - int index = skipHttpHeader(); - if (index != E_OK) { - close(); - return index; - } - index = 0; - while (index < length) { - DBG("%s", "Received Data: "); - while ((index < length) && _client->available()) { - buffer[index++] = _client->read(); - DBG("%c", buffer[index - 1]); - } - DBGLNEND; - if ((!_client->connected()) && - (index < length)) { - close(); - return E_NOCONNECTION; - } - - delay(200); - } - } - close(); return status; } -static int write_delete_values(Print* print, const char* from, - const char* end) { - int bytes = 0; - bytes += print->print("{\"from\":\""); - bytes += print->print(from); - bytes += print->print("\",\"end\":\""); - bytes += print->print(end); - bytes += print->print("\"}"); - return bytes; -} - -// Encodes and prints string using Percent-encoding specified -// in RFC 1738, Section 2.2 -int print_encoded_string(Print* print, const char* str) { - int bytes = 0; - for (int i = 0; str[i] != 0; i++) { - if (((str[i] >= 'A') && (str[i] <= 'Z')) || - ((str[i] >= 'a') && (str[i] <= 'z')) || - ((str[i] >= '0') && (str[i] <= '9')) || - (str[i] == '-') || (str[i] == '_') || - (str[i] == '.') || (str[i] == '~')) { - bytes += print->print(str[i]); - } else { - // Encode all other characters - bytes += print->print('%'); - bytes += print->print(HEX(str[i] / 16)); - bytes += print->print(HEX(str[i] % 16)); - } - } - return bytes; -} - -void M2XStreamClient::writePutHeader(const char* deviceId, - const char* streamName, - int contentLength) { - _client->print("PUT "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->print("/streams/"); - print_encoded_string(_client, streamName); - _client->println("/value HTTP/1.0"); - - writeHttpHeader(contentLength); -} - -void M2XStreamClient::writeDeleteHeader(const char* deviceId, - const char* streamName, - int contentLength) { - _client->print("DELETE "); - if (_path_prefix) - _client->print(_path_prefix); - _client->print("/v2/devices/"); - print_encoded_string(_client, deviceId); - _client->print("/streams/"); - print_encoded_string(_client, streamName); - _client->print("/values"); - _client->println(" HTTP/1.0"); - - writeHttpHeader(contentLength); -} - -void M2XStreamClient::writeHttpHeader(int contentLength) { - _client->println(USER_AGENT); - _client->print("X-M2X-KEY: "); - _client->println(_key); - - _client->print("Host: "); - print_encoded_string(_client, _host); - if (_port != kDefaultM2XPort) { - _client->print(":"); - // port is an integer, does not need encoding - _client->print(_port); +int TimeService::getTimestamp(char* buffer, int* length) { + uint32_t now = _timer.read_ms(); + if (now < _local_last_milli) { + // In case of a timestamp overflow, we reset the server timestamp recorded. + // Notice that unlike Arduino, mbed would overflow every 30 minutes, + // so if 2 calls to this API are more than 30 minutes apart, we are + // likely to run into troubles. This is a limitation of the current + // mbed platform. However, we argue that it might be a rare case that + // 2 calls to this are 30 minutes apart. In most cases, we would call + // this API every few seconds or minutes, this won't be a huge problem. + // However, if you have a use case that would require 2 intervening + // calls to this be 30 minutes apart, you might want to leverage a RTC + // clock instead of the simple ticker here. + int status = reset(); + if (!m2x_status_is_success(status)) { return status; } + now = _timer.read_ms(); } - _client->println(); - - if (contentLength > 0) { - _client->println("Content-Type: application/json"); - DBG("%s", "Content Length: "); - DBGLN("%d", contentLength); - - _client->print("Content-Length: "); - _client->println(contentLength); + if (now < _local_last_milli) { + // We have already reseted the timestamp, so this cannot happen, + // an HTTP request cannot last 30 minutes, something else must be wrong + // here. + return E_TIMESTAMP_ERROR; } - _client->println(); -} - -int M2XStreamClient::waitForString(const char* str) { - int currentIndex = 0; - if (str[currentIndex] == '\0') return E_OK; - - while (true) { - while (_client->available()) { - char c = _client->read(); - DBG("%c", c); - - int cmp; - if (_case_insensitive) { - cmp = tolower(c) - tolower(str[currentIndex]); - } else { - cmp = c - str[currentIndex]; - } - - if ((str[currentIndex] == '*') || (cmp == 0)) { - currentIndex++; - if (str[currentIndex] == '\0') { - return E_OK; - } - } else { - // start from the beginning - currentIndex = 0; - } - } - - if (!_client->connected()) { - DBGLN("%s", "ERROR: The client is disconnected from the server!"); - - close(); - return E_DISCONNECTED; - } - - delay(1000); - } - // never reached here - return E_NOTREACHABLE; -} - -int M2XStreamClient::readStatusCode(bool closeClient) { - int responseCode = 0; - int ret = waitForString("HTTP/*.* "); - if (ret != E_OK) { - if (closeClient) close(); - return ret; - } - - // ret is not needed from here(since it must be E_OK), so we can use it - // as a regular variable now. - ret = 0; - while (true) { - while (_client->available()) { - char c = _client->read(); - DBG("%c", c); - - responseCode = responseCode * 10 + (c - '0'); - ret++; - if (ret == 3) { - if (closeClient) close(); - return responseCode; - } - } - - if (!_client->connected()) { - DBGLN("%s", "ERROR: The client is disconnected from the server!"); - - if (closeClient) close(); - return E_DISCONNECTED; - } - - delay(1000); - } - - // never reached here - return E_NOTREACHABLE; + uint32_t diff = now - _local_last_milli; + _local_last_milli = now; + _server_timestamp += (int32_t) (diff / 1000); // Milliseconds to seconds + return fill_iso8601_timestamp(_server_timestamp, (int32_t) (diff % 1000), + buffer, length); } -int M2XStreamClient::readContentLength() { - int ret = waitForString("Content-Length: "); - if (ret != E_OK) { - return ret; - } - - // From now on, ret is not needed, we can use it - // to keep the final result - ret = 0; - while (true) { - while (_client->available()) { - char c = _client->read(); - DBG("%c", c); - - if ((c == '\r') || (c == '\n')) { - return (ret == 0) ? (E_INVALID) : (ret); - } else { - ret = ret * 10 + (c - '0'); - } - } - - if (!_client->connected()) { - DBGLN("%s", "ERROR: The client is disconnected from the server!"); - - return E_DISCONNECTED; - } +#define SIZE_ISO_8601 25 +static inline bool is_leap_year(int16_t y) { + return ((1970 + y) > 0) && + !((1970 + y) % 4) && + (((1970 + y) % 100) || !((1970 + y) % 400)); +} +static inline int32_t days_in_year(int16_t y) { + return is_leap_year(y) ? 366 : 365; +} +static const uint8_t MONTH_DAYS[]={31,28,31,30,31,30,31,31,30,31,30,31}; - delay(1000); - } - - // never reached here - return E_NOTREACHABLE; -} - -int M2XStreamClient::skipHttpHeader() { - return waitForString("\n\r\n"); -} +static int fill_iso8601_timestamp(int32_t timestamp, int32_t milli, + char* buffer, int* length) { + int16_t year; + int8_t month, month_length; + int32_t day; + int8_t hour, minute, second; -void M2XStreamClient::close() { - // Eats up buffered data before closing - _client->flush(); - _client->stop(); -} - -int M2XStreamClient::readStreamValue(stream_value_read_callback callback, - void* context) { - const int BUF_LEN = 64; - char buf[BUF_LEN]; - - int length = readContentLength(); - if (length < 0) { - close(); - return length; + if (*length < SIZE_ISO_8601) { + *length = SIZE_ISO_8601; + return E_BUFFER_TOO_SMALL; } - int index = skipHttpHeader(); - if (index != E_OK) { - close(); - return index; - } - index = 0; + second = timestamp % 60; + timestamp /= 60; // now it is minutes - stream_parsing_context_state state; - state.state = state.index = 0; - state.callback = callback; - state.context = context; + minute = timestamp % 60; + timestamp /= 60; // now it is hours - jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; - cbs.key_found = on_stream_key_found; - cbs.number_found = on_stream_number_found; - cbs.string_found = on_stream_string_found; - cbs.context.client_state = &state; - - jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); - jsonlite_parser_set_callback(p, &cbs); + hour = timestamp % 24; + timestamp /= 24; // now it is days - jsonlite_result result = jsonlite_result_unknown; - while (index < length) { - int i = 0; - - DBG("%s", "Received Data: "); - while ((i < BUF_LEN) && _client->available()) { - buf[i++] = _client->read(); - DBG("%c", buf[i - 1]); - } - DBGLNEND; + year = 0; + day = 0; + while ((day += days_in_year(year)) <= timestamp) { + year++; + } + day -= days_in_year(year); + timestamp -= day; // now it is days in this year, starting at 0 - if ((!_client->connected()) && - (!_client->available()) && - ((index + i) < length)) { - jsonlite_parser_release(p); - close(); - return E_NOCONNECTION; - } - - result = jsonlite_parser_tokenize(p, buf, i); - if ((result != jsonlite_result_ok) && - (result != jsonlite_result_end_of_stream)) { - jsonlite_parser_release(p); - close(); - return E_JSON_INVALID; + day = 0; + month_length = 0; + for (month = 0; month < 12; month++) { + if (month == 1) { + // February + month_length = is_leap_year(year) ? 29 : 28; + } else { + month_length = MONTH_DAYS[month]; } - index += i; - } - - jsonlite_parser_release(p); - close(); - return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); -} - -int M2XStreamClient::readLocation(location_read_callback callback, - void* context) { - const int BUF_LEN = 40; - char buf[BUF_LEN]; - - int length = readContentLength(); - if (length < 0) { - close(); - return length; + if (timestamp >= month_length) { + timestamp -= month_length; + } else { + break; + } } + year = 1970 + year; + month++; // offset by 1 + day = timestamp + 1; - int index = skipHttpHeader(); - if (index != E_OK) { - close(); - return index; - } - index = 0; + int i = 0, j = 0; - location_parsing_context_state state; - state.state = state.index = 0; - state.callback = callback; - state.context = context; - - jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; - cbs.key_found = on_location_key_found; - cbs.string_found = on_location_string_found; - cbs.context.client_state = &state; + // NOTE: It seems the snprintf implementation in Arduino has bugs, + // we have to manually piece the string together here. +#define INT_TO_STR(v_, width_) \ + for (j = 0; j < (width_); j++) { \ + buffer[i + (width_) - 1 - j] = '0' + ((v_) % 10); \ + (v_) /= 10; \ + } \ + i += (width_) - jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); - jsonlite_parser_set_callback(p, &cbs); - - jsonlite_result result = jsonlite_result_unknown; - while (index < length) { - int i = 0; - - DBG("%s", "Received Data: "); - while ((i < BUF_LEN) && _client->available()) { - buf[i++] = _client->read(); - DBG("%c", buf[i - 1]); - } - DBGLNEND; + INT_TO_STR(year, 4); + buffer[i++] = '-'; + INT_TO_STR(month, 2); + buffer[i++] = '-'; + INT_TO_STR(day, 2); + buffer[i++] = 'T'; + INT_TO_STR(hour, 2); + buffer[i++] = ':'; + INT_TO_STR(minute, 2); + buffer[i++] = ':'; + INT_TO_STR(second, 2); + buffer[i++] = '.'; + INT_TO_STR(milli, 3); + buffer[i++] = 'Z'; + buffer[i++] = '\0'; - if ((!_client->connected()) && - (!_client->available()) && - ((index + i) < length)) { - jsonlite_parser_release(p); - close(); - return E_NOCONNECTION; - } +#undef INT_TO_STR - result = jsonlite_parser_tokenize(p, buf, i); - if ((result != jsonlite_result_ok) && - (result != jsonlite_result_end_of_stream)) { - jsonlite_parser_release(p); - close(); - return E_JSON_INVALID; - } - - index += i; - } - - jsonlite_parser_release(p); - close(); - return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); + *length = i; + return E_OK; }