ARM mbed M2X API Client: The ARM mbed client library is used to send/receive data to/from AT&T's M2X service from mbed LPC1768 microcontrollers.

Dependents:   m2x-demo-all M2X_MTS_ACCEL_DEMO M2X_MTS_Accel M2X_K64F_ACCEL ... more

Files at this revision

API Documentation at this revision

Comitter:
citrusbyte
Date:
Thu Oct 24 12:22:33 2013 +0000
Child:
6:e6d66d99dd6f
Commit message:
Initial commit for M2X mbed client library

Changed in this revision

Client.cpp Show annotated file Show diff for this revision Revisions of this file
Client.h Show annotated file Show diff for this revision Revisions of this file
LocationParseFunctions.h Show annotated file Show diff for this revision Revisions of this file
M2XStreamClient.cpp Show annotated file Show diff for this revision Revisions of this file
M2XStreamClient.h Show annotated file Show diff for this revision Revisions of this file
NullPrint.h Show annotated file Show diff for this revision Revisions of this file
Print.cpp Show annotated file Show diff for this revision Revisions of this file
Print.h Show annotated file Show diff for this revision Revisions of this file
StreamParseFunctions.h Show annotated file Show diff for this revision Revisions of this file
Utility.cpp Show annotated file Show diff for this revision Revisions of this file
Utility.h Show annotated file Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Client.cpp	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,61 @@
+#include "Client.h"
+#include "mbed.h"
+
+#include <stdint.h>
+
+Client::Client() : _len(0), _sock() {
+}
+
+Client::~Client() {
+}
+
+int Client::connect(const char *host, uint16_t port) {
+  return _sock.connect(host, port) == 0;
+}
+
+size_t Client::write(uint8_t b) {
+  return write(&b, 1);
+}
+
+size_t Client::write(const uint8_t *buf, size_t size) {
+  _sock.set_blocking(false, 15000);
+  // NOTE: we know it's dangerous to cast from (const uint8_t *) to (char *),
+  // but we are trying to maintain a stable interface between the Arduino
+  // one and the mbed one. What's more, while TCPSocketConnection has no
+  // intention of modifying the data here, it requires us to send a (char *)
+  // typed data. So we belive it's safe to do the cast here.
+  return _sock.send_all(const_cast<char*>((const char*) buf), size);
+}
+
+int Client::available() {
+  if (_len > 0) { return 1; }
+  int ret = read(_buf, 1);
+  if (ret <= 0) { return 0; }
+  _len = ret;
+  return 1;
+}
+
+int Client::read() {
+  if (_len > 0) {
+    _len = 0;
+    return _buf[0];
+  }
+  return -1;
+}
+
+int Client::read(uint8_t *buf, size_t size) {
+  return _sock.receive_all((char*) buf, size);
+}
+
+void Client::flush() {
+  // does nothing, TCP stack takes care of this
+}
+
+void Client::stop() {
+  _sock.close();
+}
+
+uint8_t Client::connected() {
+  return _sock.is_connected();
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Client.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,31 @@
+#ifndef Client_h
+#define Client_h
+
+#include "Print.h"
+#include "TCPSocketConnection.h"
+
+/*
+ * TCP Client
+ */
+class Client : public Print {
+public:
+  Client();
+  ~Client();
+
+  virtual int connect(const char *host, uint16_t port);
+  virtual size_t write(uint8_t);
+  virtual size_t write(const uint8_t *buf, size_t size);
+  virtual int available();
+  virtual int read();
+  virtual void flush();
+  virtual void stop();
+  virtual uint8_t connected();
+private:
+  virtual int read(uint8_t *buf, size_t size);
+  uint8_t _buf[1];
+  uint8_t _len;
+  TCPSocketConnection _sock;
+};
+
+#endif
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/LocationParseFunctions.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,102 @@
+#ifndef LocationParseFunctions_h
+#define LocationParseFunctions_h
+
+// Data structures and functions used to parse locations
+
+#define LOCATION_BUF_LEN 20
+
+typedef struct {
+  uint16_t state;
+  char name_str[LOCATION_BUF_LEN + 1];
+  double latitude;
+  double longitude;
+  double elevation;
+  char timestamp_str[LOCATION_BUF_LEN + 1];
+  int index;
+
+  location_read_callback callback;
+  void* context;
+} location_parsing_context_state;
+
+#define WAITING_NAME 0x1
+#define WAITING_LATITUDE 0x2
+#define WAITING_LONGITUDE 0x4
+#define WAITING_ELEVATION 0x8
+#define WAITING_TIMESTAMP 0x10
+
+#define GOT_NAME 0x20
+#define GOT_LATITUDE 0x40
+#define GOT_LONGITUDE 0x80
+#define GOT_ELEVATION 0x100
+#define GOT_TIMESTAMP 0x200
+
+#define GOT_LOCATION (GOT_NAME | GOT_LATITUDE | GOT_LONGITUDE | GOT_ELEVATION | GOT_TIMESTAMP)
+#define TEST_GOT_LOCATION(state_) (((state_) & GOT_LOCATION) == GOT_LOCATION)
+
+#define TEST_IS_NAME(state_) (((state_) & (WAITING_NAME | GOT_NAME)) == WAITING_NAME)
+#define TEST_IS_LATITUDE(state_) (((state_) & (WAITING_LATITUDE | GOT_LATITUDE)) \
+                                  == WAITING_LATITUDE)
+#define TEST_IS_LONGITUDE(state_) (((state_) & (WAITING_LONGITUDE | GOT_LONGITUDE)) \
+                                   == WAITING_LONGITUDE)
+#define TEST_IS_ELEVATION(state_) (((state_) & (WAITING_ELEVATION | GOT_ELEVATION)) \
+                                   == WAITING_ELEVATION)
+#define TEST_IS_TIMESTAMP(state_) (((state_) & (WAITING_TIMESTAMP | GOT_TIMESTAMP)) \
+                                   == WAITING_TIMESTAMP)
+
+static void on_location_key_found(jsonlite_callback_context* context,
+                                  jsonlite_token* token) {
+  location_parsing_context_state* state =
+      (location_parsing_context_state*) context->client_state;
+  if (strncmp((const char*) token->start, "waypoints", 9) == 0) {
+    // only parses those locations in waypoints, skip the outer one
+    state->state = 0;
+  } else if (strncmp((const char*) token->start, "name", 4) == 0) {
+    state->state |= WAITING_NAME;
+  } else if (strncmp((const char*) token->start, "latitude", 8) == 0) {
+    state->state |= WAITING_LATITUDE;
+  } else if (strncmp((const char*) token->start, "longitude", 9) == 0) {
+    state->state |= WAITING_LONGITUDE;
+  } else if (strncmp((const char*) token->start, "elevation", 9) == 0) {
+    state->state |= WAITING_ELEVATION;
+  } else if (strncmp((const char*) token->start, "timestamp", 9) == 0) {
+    state->state |= WAITING_TIMESTAMP;
+  }
+}
+
+static void on_location_string_found(jsonlite_callback_context* context,
+                                     jsonlite_token* token) {
+  location_parsing_context_state* state =
+      (location_parsing_context_state*) context->client_state;
+
+  if (TEST_IS_NAME(state->state)) {
+    strncpy(state->name_str, (const char*) token->start,
+            MIN(token->end - token->start, LOCATION_BUF_LEN));
+    state->name_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
+    state->state |= GOT_NAME;
+  } else if (TEST_IS_LATITUDE(state->state)) {
+    state->latitude = atof((const char*) token->start);
+    state->state |= GOT_LATITUDE;
+  } else if (TEST_IS_LONGITUDE(state->state)) {
+    state->longitude = atof((const char*) token->start);
+    state->state |= GOT_LONGITUDE;
+  } else if (TEST_IS_ELEVATION(state->state)) {
+    state->elevation = atof((const char*) token->start);
+    state->state |= GOT_ELEVATION;
+  } else if (TEST_IS_TIMESTAMP(state->state)) {
+    strncpy(state->timestamp_str, (const char*) token->start,
+            MIN(token->end - token->start, LOCATION_BUF_LEN));
+    state->timestamp_str[MIN(token->end - token->start, LOCATION_BUF_LEN)] = '\0';
+    state->state |= GOT_TIMESTAMP;
+  }
+
+  if (TEST_GOT_LOCATION(state->state)) {
+    state->callback(state->name_str, state->latitude, state->longitude,
+                    state->elevation, state->timestamp_str, state->index++,
+                    state->context);
+    state->state = 0;
+  }
+}
+
+#endif  /* LocationParseFunctions_h */
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/M2XStreamClient.cpp	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,584 @@
+#include "M2XStreamClient.h"
+
+#include <jsonlite.h>
+
+#include "StreamParseFunctions.h"
+#include "LocationParseFunctions.h"
+
+#define HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
+#define MAX_DOUBLE_DIGITS 7
+
+const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
+const char* kUserAgentLine = "User-Agent: M2X Arduino Client/0.1";
+
+static int print_encoded_string(Print* print, const char* str);
+
+M2XStreamClient::M2XStreamClient(Client* client,
+                                 const char* key,
+                                 const char* host,
+                                 int port) : _client(client),
+                                             _key(key),
+                                             _host(host),
+                                             _port(port),
+                                             _null_print() {
+}
+
+int M2XStreamClient::send(const char* feedId,
+                          const char* streamName,
+                          double value) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+    writeSendHeader(feedId, streamName,
+                    // 6 for "value="
+                    _null_print.print(value) + 6);
+    _client->print("value=");
+    // value is a double, does not need encoding
+    _client->print(value);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::send(const char* feedId,
+                          const char* streamName,
+                          long value) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+    writeSendHeader(feedId, streamName,
+                    // 6 for "value="
+                    _null_print.print(value) + 6);
+
+    _client->print("value=");
+    // value is a long, does not need encoding
+    _client->print(value);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::send(const char* feedId,
+                          const char* streamName,
+                          int value) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+    writeSendHeader(feedId, streamName,
+                    // 6 for "value="
+                    _null_print.print(value) + 6);
+
+    _client->print("value=");
+    // value is an int, does not need encoding
+    _client->print(value);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::send(const char* feedId,
+                          const char* streamName,
+                          const char* value) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+    writeSendHeader(feedId, streamName,
+                    // 6 for "value="
+                    _null_print.print(value) + 6);
+
+    _client->print("value=");
+    print_encoded_string(_client, value);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::receive(const char* feedId, const char* streamName,
+                             stream_value_read_callback callback, void* context) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+    _client->print("GET /v1/feeds/");
+    print_encoded_string(_client, feedId);
+    _client->print("/streams/");
+    print_encoded_string(_client, streamName);
+    _client->println("/values HTTP/1.0");
+
+    writeHttpHeader(-1);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readStreamValue(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+int M2XStreamClient::readLocation(const char* feedId,
+                                  location_read_callback callback,
+                                  void* context) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+    _client->print("GET /v1/feeds/");
+    print_encoded_string(_client, feedId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(-1);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+  int status = readStatusCode(false);
+  if (status == 200) {
+    readLocation(callback, context);
+  }
+
+  close();
+  return status;
+}
+
+// Encodes and prints string using Percent-encoding specified
+// in RFC 1738, Section 2.2
+static int print_encoded_string(Print* print, const char* str) {
+  int bytes = 0;
+  for (int i = 0; str[i] != 0; i++) {
+    if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
+        ((str[i] >= 'a') && (str[i] <= 'z')) ||
+        ((str[i] >= '0') && (str[i] <= '9')) ||
+        (str[i] == '-') || (str[i] == '_') ||
+        (str[i] == '.') || (str[i] == '~')) {
+      bytes += print->print(str[i]);
+    } else {
+      // Encode all other characters
+      bytes += print->print('%');
+      bytes += print->print(HEX(str[i] / 16));
+      bytes += print->print(HEX(str[i] % 16));
+    }
+  }
+  return bytes;
+}
+
+static int write_location_data(Print* print, const char* name,
+                               double latitude, double longitude,
+                               double elevation) {
+  int bytes = 0;
+  bytes += print->print("name=");
+  bytes += print_encoded_string(print, name);
+  bytes += print->print("&latitude=");
+  bytes += print->print(latitude, MAX_DOUBLE_DIGITS);
+  bytes += print->print("&longitude=");
+  bytes += print->print(longitude, MAX_DOUBLE_DIGITS);
+  bytes += print->print("&elevation=");
+  bytes += print->print(elevation);
+  return bytes;
+}
+
+static int write_location_data(Print* print, const char* name,
+                               const char* latitude, const char* longitude,
+                               const char* elevation) {
+  int bytes = 0;
+  bytes += print->print("name=");
+  bytes += print_encoded_string(print, name);
+  bytes += print->print("&latitude=");
+  bytes += print_encoded_string(print, latitude);
+  bytes += print->print("&longitude=");
+  bytes += print_encoded_string(print, longitude);
+  bytes += print->print("&elevation=");
+  bytes += print_encoded_string(print, elevation);
+  return bytes;
+}
+
+int M2XStreamClient::updateLocation(const char* feedId,
+                                    const char* name,
+                                    double latitude,
+                                    double longitude,
+                                    double elevation) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+
+    int length = write_location_data(&_null_print, name, latitude, longitude,
+                                     elevation);
+    _client->print("PUT /v1/feeds/");
+    print_encoded_string(_client, feedId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(length);
+    write_location_data(_client, name, latitude, longitude, elevation);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+int M2XStreamClient::updateLocation(const char* feedId,
+                                    const char* name,
+                                    const char* latitude,
+                                    const char* longitude,
+                                    const char* elevation) {
+  if (_client->connect(_host, _port)) {
+#ifdef DEBUG
+    printf("Connected to M2X server!\n");
+#endif
+
+    int length = write_location_data(&_null_print, name, latitude, longitude,
+                                     elevation);
+    _client->print("PUT /v1/feeds/");
+    print_encoded_string(_client, feedId);
+    _client->println("/location HTTP/1.0");
+
+    writeHttpHeader(length);
+    write_location_data(_client, name, latitude, longitude, elevation);
+  } else {
+#ifdef DEBUG
+    printf("ERROR: Cannot connect to M2X server!\n");
+#endif
+    return E_NOCONNECTION;
+  }
+  return readStatusCode(true);
+}
+
+void M2XStreamClient::writeSendHeader(const char* feedId,
+                                      const char* streamName,
+                                      int contentLength) {
+  _client->print("PUT /v1/feeds/");
+  print_encoded_string(_client, feedId);
+  _client->print("/streams/");
+  print_encoded_string(_client, streamName);
+  _client->println(" HTTP/1.0");
+  
+  writeHttpHeader(contentLength);
+}
+
+void M2XStreamClient::writeHttpHeader(int contentLength) {
+  _client->println(kUserAgentLine);
+  _client->print("X-M2X-KEY: ");
+  _client->println(_key);
+  
+  _client->print("Host: ");
+  print_encoded_string(_client, _host);
+  if (_port != kDefaultM2XPort) {
+    _client->print(":");
+    // port is an integer, does not need encoding
+    _client->print(_port);
+  }
+  _client->println();
+
+  if (contentLength > 0) {
+    _client->println("Content-Type: application/x-www-form-urlencoded");
+#ifdef DEBUG
+    printf("Content Length: %d\n", contentLength);
+#endif
+    _client->print("Content-Length: ");
+    _client->println(contentLength);
+  }
+  _client->println();
+}
+
+int M2XStreamClient::waitForString(const char* str) {
+  int currentIndex = 0;
+  if (str[currentIndex] == '\0') return E_OK;
+
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+#ifdef DEBUG
+      printf("%c", c);
+#endif
+
+      if ((str[currentIndex] == '*') ||
+          (c == str[currentIndex])) {
+        currentIndex++;
+        if (str[currentIndex] == '\0') {
+          return E_OK;
+        }
+      } else {
+        // start from the beginning
+        currentIndex = 0;
+      }
+    }
+
+    if (!_client->connected()) {
+#ifdef DEBUG
+      printf("ERROR: The client is disconnected from the server!\n");
+#endif
+      close();
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
+  }
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::readStatusCode(bool closeClient) {
+  int responseCode = 0;
+  int ret = waitForString("HTTP/*.* ");
+  if (ret != E_OK) {
+    if (closeClient) close();
+    return ret;
+  }
+
+  // ret is not needed from here(since it must be E_OK), so we can use it
+  // as a regular variable now.
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+#ifdef DEBUG
+      printf("%c", c);
+#endif
+      responseCode = responseCode * 10 + (c - '0');
+      ret++;
+      if (ret == 3) {
+        if (closeClient) close();
+        return responseCode;
+      }
+    }
+
+    if (!_client->connected()) {
+#ifdef DEBUG
+      printf("ERROR: The client is disconnected from the server!\n");
+#endif
+      if (closeClient) close();
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
+  }
+
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::readContentLength() {
+  int ret = waitForString("Content-Length: ");
+  if (ret != E_OK) {
+    return ret;
+  }
+
+  // From now on, ret is not needed, we can use it
+  // to keep the final result
+  ret = 0;
+  while (true) {
+    while (_client->available()) {
+      char c = _client->read();
+#ifdef DEBUG
+      printf("%c", c);
+#endif
+      if ((c == '\r') || (c == '\n')) {
+        return (ret == 0) ? (E_INVALID) : (ret);
+      } else {
+        ret = ret * 10 + (c - '0');
+      }
+    }
+
+    if (!_client->connected()) {
+#ifdef DEBUG
+      printf("ERROR: The client is disconnected from the server!\n");
+#endif
+      return E_DISCONNECTED;
+    }
+
+    delay(1000);
+  }
+
+  // never reached here
+  return E_NOTREACHABLE;
+}
+
+int M2XStreamClient::skipHttpHeader() {
+  return waitForString("\r\n\r\n");
+}
+
+void M2XStreamClient::close() {
+  // Eats up buffered data before closing
+  _client->flush();
+  _client->stop();
+}
+
+int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
+                                     void* context) {
+  const int BUF_LEN = 32;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  stream_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_stream_key_found;
+  cbs.string_found = on_stream_string_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+#ifdef DEBUG
+    printf("Received Data: ");
+#endif
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+#ifdef DEBUG
+      printf("%c", buf[i - 1]);
+#endif
+    }
+#ifdef DEBUG
+      printf("\n");
+#endif
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+int M2XStreamClient::readLocation(location_read_callback callback,
+                                  void* context) {
+  const int BUF_LEN = 40;
+  char buf[BUF_LEN];
+
+  int length = readContentLength();
+  if (length < 0) {
+    close();
+    return length;
+  }
+
+  int index = skipHttpHeader();
+  if (index != E_OK) {
+    close();
+    return index;
+  }
+  index = 0;
+
+  location_parsing_context_state state;
+  state.state = state.index = 0;
+  state.callback = callback;
+  state.context = context;
+
+  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
+  cbs.key_found = on_location_key_found;
+  cbs.string_found = on_location_string_found;
+  cbs.context.client_state = &state;
+
+  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
+  jsonlite_parser_set_callback(p, &cbs);
+
+  jsonlite_result result = jsonlite_result_unknown;
+  while (index < length) {
+    int i = 0;
+
+#ifdef DEBUG
+    printf("Received Data: ");
+#endif
+    while ((i < BUF_LEN) && _client->available()) {
+      buf[i++] = _client->read();
+#ifdef DEBUG
+      printf("%c", buf[i - 1]);
+#endif
+    }
+#ifdef DEBUG
+    printf("\n");
+#endif
+
+    if ((!_client->connected()) &&
+        (!_client->available()) &&
+        ((index + i) < length)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_NOCONNECTION;
+    }
+
+    result = jsonlite_parser_tokenize(p, buf, i);
+    if ((result != jsonlite_result_ok) &&
+        (result != jsonlite_result_end_of_stream)) {
+      jsonlite_parser_release(p);
+      close();
+      return E_JSON_INVALID;
+    }
+
+    index += i;
+  }
+
+  jsonlite_parser_release(p);
+  close();
+  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
+}
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/M2XStreamClient.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,128 @@
+#ifndef M2XStreamClient_h
+#define M2XStreamClient_h
+
+#define MIN(a, b) (((a) > (b))?(b):(a))
+
+#include "mbed.h"
+#include "Client.h"
+#include "Utility.h"
+
+#include "NullPrint.h"
+
+static const int E_OK = 0;
+static const int E_NOCONNECTION = -1;
+static const int E_DISCONNECTED = -2;
+static const int E_NOTREACHABLE = -3;
+static const int E_INVALID = -4;
+static const int E_JSON_INVALID = -5;
+
+typedef void (*stream_value_read_callback)(const char* at,
+                                           const char* value,
+                                           int index,
+                                           void* context);
+
+typedef void (*location_read_callback)(const char* name,
+                                       double latitude,
+                                       double longitude,
+                                       double elevation,
+                                       const char* timestamp,
+                                       int index,
+                                       void* context);
+
+class M2XStreamClient {
+public:
+  static const char* kDefaultM2XHost;
+  static const int kDefaultM2XPort = 80;
+
+  M2XStreamClient(Client* client,
+                  const char* key,
+                  const char* host = kDefaultM2XHost,
+                  int port = kDefaultM2XPort);
+
+  // Update data stream, returns the HTTP status code
+  int send(const char* feedId, const char* streamName, double value);
+  int send(const char* feedId, const char* streamName, long value);
+  int send(const char* feedId, const char* streamName, int value);
+  int send(const char* feedId, const char* streamName, const char* value);
+
+  // Receive values for a particular data stream. Since memory is
+  // very limited on an Arduino, we cannot parse and get all the
+  // data points in memory. Instead, we use callbacks here: whenever
+  // a new data point is parsed, we call the callback using the values,
+  // after that, the values will be thrown away to make space for new
+  // values.
+  // Note that you can also pass in a user-specified context in this
+  // function, this context will be passed to the callback function
+  // each time we get a data point.
+  // For each data point, the callback will be called once. The HTTP
+  // status code will be returned. And the content is only parsed when
+  // the status code is 200.
+  int receive(const char* feedId, const char* streamName,
+              stream_value_read_callback callback, void* context);
+
+  // Update datasource location
+  // NOTE: On an Arduino Uno and other ATMEGA based boards, double has
+  // 4-byte (32 bits) precision, which is the same as float. So there's
+  // no natural double-precision floating number on these boards. With
+  // a float value, we have a precision of roughly 7 digits, that means
+  // either 5 or 6 digits after the floating point. According to wikipedia,
+  // a difference of 0.00001 will give us ~1.1132m distance. If this
+  // precision is good for you, you can use the double-version we provided
+  // here. Otherwise, you may need to use the string-version and do the
+  // actual conversion by yourselves.
+  // However, with an Arduino Due board, double has 8-bytes (64 bits)
+  // precision, which means you are free to use the double-version only
+  // without any precision problems.
+  // Returned value is the http status code.
+  int updateLocation(const char* feedId, const char* name,
+                     double latitude, double longitude, double elevation);
+  int updateLocation(const char* feedId, const char* name,
+                     const char* latitude, const char* longitude,
+                     const char* elevation);
+
+  // Read location information for a feed. Also used callback to process
+  // data points for memory reasons. The HTTP status code is returned,
+  // response is only parsed when the HTTP status code is 200
+  int readLocation(const char* feedId, location_read_callback callback,
+                   void* context);
+private:
+  Client* _client;
+  const char* _key;
+  const char* _host;
+  int _port;
+  NullPrint _null_print;
+
+  // Writes the HTTP header part for updating a stream value
+  void writeSendHeader(const char* feedId,
+                       const char* streamName,
+                       int contentLength);
+  // Writes HTTP header lines including M2X key, host, content
+  // type and content length(if the body exists)
+  void writeHttpHeader(int contentLength);
+  // Parses HTTP response header and return the content length.
+  // Note that this function does not parse all http headers, as long
+  // as the content length is found, this function will return
+  int readContentLength();
+  // Skips all HTTP response header part. Return minus value in case
+  // the connection is closed before we got all headers
+  int skipHttpHeader();
+  // Parses and returns the HTTP status code, note this function will
+  // return immediately once it gets the status code
+  int readStatusCode(bool closeClient);
+  // Waits for a certain string pattern in the HTTP header, and returns
+  // once the pattern is found. In the pattern, you can use '*' to denote
+  // any character
+  int waitForString(const char* str);
+  // Closes the connection
+  void close();
+  // Parses JSON response of stream value API, and calls callback function
+  // once we get a data point
+  int readStreamValue(stream_value_read_callback callback, void* context);
+  // Parses JSON response of location API, and calls callback function once
+  // we get a data point
+  int readLocation(location_read_callback callback, void* context);
+};
+
+#endif  /* M2XStreamClient_h */
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/NullPrint.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,19 @@
+#ifndef NullPrint_h
+#define NullPrint_h
+
+#include "Print.h"
+
+// Null Print class used to calculate length to print
+class NullPrint : public Print {
+public:
+  virtual size_t write(uint8_t b) {
+    return 1;
+  }
+  
+  virtual size_t write(const uint8_t* buf, size_t size) {
+    return size;
+  }
+};
+
+#endif  /* NullPrint_h */
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Print.cpp	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,63 @@
+#include "Print.h"
+#include "mbed.h"
+
+#include <stdio.h>
+#include <string.h>
+
+size_t Print::write(const uint8_t* buf, size_t size) {
+  size_t ret = 0;
+  while (size--) {
+    ret += write(*buf++);
+  }
+  return ret;
+}
+
+size_t Print::print(const char* s) {
+  return write((const uint8_t*)s, strlen(s));
+}
+
+size_t Print::print(char c) {
+  return write(c);
+}
+
+size_t Print::print(int n) {
+  return print((long) n);
+}
+
+size_t Print::print(long n) {
+  char buf[8 * sizeof(long) + 1];
+  snprintf(buf, sizeof(buf), "%ld", n);
+  return print(buf);
+}
+
+// Digits are ignored for now
+size_t Print::print(double n, int digits) {
+  char buf[65];
+  snprintf(buf, sizeof(buf), "%g", n);
+  return print(buf);
+}
+
+size_t Print::println(const char* s) {
+  return print(s) + println();
+}
+
+size_t Print::println(char c) {
+  return print(c) + println();
+}
+
+size_t Print::println(int n) {
+  return print(n) + println();
+}
+
+size_t Print::println(long n) {
+  return print(n) + println();
+}
+
+size_t Print::println(double n, int digits) {
+  return print(n, digits) + println();
+}
+
+size_t Print::println() {
+  return print('\r') + print('\n');
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Print.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,27 @@
+#ifndef Print_h
+#define Print_h
+
+#include <stddef.h>
+#include <stdint.h>
+
+class Print {
+public:
+  size_t print(const char* s);
+  size_t print(char c);
+  size_t print(int n);
+  size_t print(long n);
+  size_t print(double n, int digits = 2);
+
+  size_t println(const char* s);
+  size_t println(char c);
+  size_t println(int n);
+  size_t println(long n);
+  size_t println(double n, int digits = 2);
+  size_t println();
+
+  virtual size_t write(uint8_t c) = 0;
+  virtual size_t write(const uint8_t* buf, size_t size);
+};
+
+#endif
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/StreamParseFunctions.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,70 @@
+#ifndef StreamParseFunctions_h
+#define StreamParseFunctions_h
+
+// Data structures and functions used to parse stream values
+
+#define STREAM_BUF_LEN 20
+
+typedef struct {
+  uint8_t state;
+  char at_str[STREAM_BUF_LEN + 1];
+  char value_str[STREAM_BUF_LEN + 1];
+  int index;
+
+  stream_value_read_callback callback;
+  void* context;
+} stream_parsing_context_state;
+
+#define WAITING_AT 0x1
+#define GOT_AT 0x2
+#define WAITING_VALUE 0x4
+#define GOT_VALUE 0x8
+
+#define GOT_STREAM (GOT_AT | GOT_VALUE)
+#define TEST_GOT_STREAM(state_) (((state_) & GOT_STREAM) == GOT_STREAM)
+
+#define TEST_IS_AT(state_) (((state_) & (WAITING_AT | GOT_AT)) == WAITING_AT)
+#define TEST_IS_VALUE(state_) (((state_) & (WAITING_VALUE | GOT_VALUE)) == \
+                               WAITING_VALUE)
+
+static void on_stream_key_found(jsonlite_callback_context* context,
+                                jsonlite_token* token)
+{
+  stream_parsing_context_state* state =
+      (stream_parsing_context_state*) context->client_state;
+  if (strncmp((const char*) token->start, "at", 2) == 0) {
+    state->state |= WAITING_AT;
+  } else if ((strncmp((const char*) token->start, "value", 5) == 0) &&
+             (token->start[5] != 's')) { // get rid of "values"
+    state->state |= WAITING_VALUE;
+  }
+}
+
+static void on_stream_string_found(jsonlite_callback_context* context,
+                                   jsonlite_token* token)
+{
+  stream_parsing_context_state* state =
+      (stream_parsing_context_state*) context->client_state;
+
+  if (TEST_IS_AT(state->state)) {
+    strncpy(state->at_str, (const char*) token->start,
+            MIN(token->end - token->start, STREAM_BUF_LEN));
+    state->at_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
+    state->state |= GOT_AT;
+  } else if (TEST_IS_VALUE(state->state)) {
+    strncpy(state->value_str, (const char*) token->start,
+            MIN(token->end - token->start, STREAM_BUF_LEN));
+    state->value_str[MIN(token->end - token->start, STREAM_BUF_LEN)] = '\0';
+    state->state |= GOT_VALUE;
+  }
+
+  if (TEST_GOT_STREAM(state->state)) {
+    state->callback(state->at_str, state->value_str,
+                    state->index++, state->context);
+    state->state = 0;
+  }
+}
+
+#endif  /* StreamParseFunctions_h */
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Utility.cpp	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,12 @@
+#include "Utility.h"
+
+void delay(int ms) {
+  wait_ms(ms);
+}
+
+char* strdup(const char* s) {
+  char* ret = (char*) malloc(strlen(s) + 1);
+  if (ret == NULL) { return ret;}
+  return strcpy(ret, s);
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Utility.h	Thu Oct 24 12:22:33 2013 +0000
@@ -0,0 +1,18 @@
+#ifndef UTILITY_H_
+#define UTILITY_H_
+
+#include "mbed.h"
+#include <string.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void delay(int ms);
+char* strdup(const char* s);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif