Free (GPLv2) TCP/IP stack developed by TASS Belgium

Dependents:   lpc1768-picotcp-demo ZeroMQ_PicoTCP_Publisher_demo TCPSocket_HelloWorld_PicoTCP Pico_TCP_UDP_Test ... more

PicoTCP. Copyright (c) 2013 TASS Belgium NV.

Released under the GNU General Public License, version 2.

Different licensing models may exist, at the sole discretion of the Copyright holders.

Official homepage: http://www.picotcp.com

Bug tracker: https://github.com/tass-belgium/picotcp/issues

Development steps:

  • initial integration with mbed RTOS
  • generic mbed Ethernet driver
  • high performance NXP LPC1768 specific Ethernet driver
  • Multi-threading support for mbed RTOS
  • Berkeley sockets and integration with the New Socket API
  • Fork of the apps running on top of the New Socket API
  • Scheduling optimizations
  • Debugging/benchmarking/testing

Demo application (measuring TCP sender performance):

Import programlpc1768-picotcp-demo

A PicoTCP demo app testing the ethernet throughput on the lpc1768 mbed board.

Revision:
63:97f481e33cb2
Parent:
51:ab4529a384a6
--- a/modules/pico_zmq.c	Mon Sep 16 12:07:35 2013 +0000
+++ b/modules/pico_zmq.c	Thu Sep 19 12:38:53 2013 +0000
@@ -14,20 +14,13 @@
 #define MY_VERSION 1u
 
  
-enum zmq_state {
-  ST_OPEN = 0,
+enum zmq_hshake_state {
+  ST_LISTEN = 0,
   ST_CONNECTED,
   ST_SIGNATURE,
   ST_VERSION,
   ST_GREETING,
-  ST_RDY,
-  ST_BUSY
-};
-
-enum zmq_role {
-  ROLE_NONE = 0,
-  ROLE_PUBLISHER,
-  ROLE_SUBSCRIBER
+  ST_RDY
 };
 
 struct __attribute__((packed)) zmq_msg {
@@ -40,25 +33,22 @@
 
 struct zmq_connector {
   struct pico_socket *sock;
-  enum zmq_state state;
-  ZMQ parent;
-  enum zmq_role role;
-  uint8_t bytes_received;
+  enum zmq_hshake_state state;
+  struct zmq_socket *parent;
   struct zmq_connector *next;
 };
 
 struct zmq_socket {
   struct pico_socket *sock;
-  void (*ready)(ZMQ z);
-  enum zmq_state state;
+  void (*ready)(struct zmq_socket *z);
+  enum zmq_hshake_state state;
   struct zmq_connector *subs;
-  enum zmq_role role;
 };
 
 static int zmq_socket_cmp(void *ka, void *kb)
 {
-  ZMQ a = ka;
-  ZMQ b = kb;
+  struct zmq_socket *a = ka;
+  struct zmq_socket *b = kb;
   if (a->sock < b->sock)
     return -1;
   if (b->sock < a->sock)
@@ -67,7 +57,7 @@
 }
 PICO_TREE_DECLARE(zmq_sockets, zmq_socket_cmp);
 
-static inline ZMQ ZMTP(struct pico_socket *s)
+static inline struct zmq_socket *ZMTP(struct pico_socket *s)
 {
   struct zmq_socket tst = { .sock = s };
   return (pico_tree_findKey(&zmq_sockets, &tst));
@@ -75,7 +65,7 @@
 
 static inline struct zmq_connector *find_subscriber(struct pico_socket *s)
 {
-  ZMQ search;
+  struct zmq_socket *search;
   struct pico_tree_node *idx;
   struct zmq_connector *el;
   pico_tree_foreach(idx, &zmq_sockets) {
@@ -91,7 +81,7 @@
 }
 
 
-static void zmq_connector_add(ZMQ z, struct zmq_connector *zc)
+static void connector_add(struct zmq_socket *z, struct zmq_connector *zc)
 {
   zc->next = z->subs;
   z->subs = zc;
@@ -99,17 +89,17 @@
   dbg("Added connector %p, sock is %p\n", zc, zc->sock);
 }
 
-static void zmq_connector_del(struct zmq_connector *zc)
+static void connector_del(struct zmq_connector *zc)
 {
-  ZMQ z = zc->parent;
+  struct zmq_socket *z = zc->parent;
   if(z) {
-    struct zmq_connector *el = z->subs, *prev = NULL;      /* el = pointer to linked list */
+    struct zmq_connector *el = z->subs, *prev = NULL;
     while(el) {
-      if (el == zc) {               /* did we find the connector that we want to delete? */
-        if (prev)                   /* was there a previous list item? */
-          prev->next = zc->next;    /* link the linked list again */
+      if (el == zc) {
+        if (prev)
+          prev->next = zc->next;
         else
-          z->subs = zc->next;       /* we were at the beginning of the list */
+          z->subs = zc->next;;
         break;
       }
       prev = el;
@@ -120,76 +110,47 @@
   pico_free(zc);
 }
 
-static void zmq_check_state(ZMQ z) 
+
+static void hs_connected(struct zmq_connector *z)
 {
-  struct zmq_connector *c = z->subs;
-  enum zmq_state default_state, option_state;
-  if ((z->state != ST_RDY) && (z->state != ST_BUSY))
-    return;
-  if (z->role == ROLE_SUBSCRIBER) {
-    default_state = ST_RDY;
-    option_state = ST_BUSY;
-  } else {
-    default_state = ST_BUSY;
-    option_state = ST_RDY;
-  }
-  z->state = default_state;
-  while(c) {
-    if (c->state == option_state) {
-      z->state = option_state;
-      return;
-    }
-    c = c->next;
-  }
-}
-
-
-static void zmq_hs_connected(struct zmq_connector *z)
-{
-  /* v2 signature */
-  uint8_t my_signature[14] =  {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f, 1, 1, 0, 0};
-
-//  uint8_t my_ver[2] = {MY_VERSION, 0};
-//  uint8_t my_greeting[52] = {'N','U','L','L', 0};
-
-  pico_socket_write(z->sock, my_signature, 14);
-//  pico_socket_write(z->sock, my_ver, 2);
-
-//  if (MY_VERSION > 2)
-//    pico_socket_write(z->sock, my_greeting, 52);
-
+  uint8_t my_ver[2] = {MY_VERSION, 0};
+  uint8_t my_signature[10] =  {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
+  uint8_t my_greeting[52] = {'N','U','L','L', 0};
+  pico_socket_write(z->sock, my_signature, 10);
+  pico_socket_write(z->sock, my_ver, 2);
+  if (MY_VERSION > 2)
+    pico_socket_write(z->sock, my_greeting, 52);
   z->state = ST_SIGNATURE;
-//  z->state = ST_RDY;
 }
  
-static void zmq_hs_signature(struct zmq_connector *zc)
+static void hs_signature(struct zmq_connector *z)
 {
   uint8_t incoming[20];
   int ret;
   
-  ret = pico_socket_read(zc->sock, incoming, 14);
-  if (zc->bytes_received == 0 && ret > 0 &&  incoming[0] != 0xFF) {
-    //dbg("Received invalid signature: [0]!=0xFF\n");
-    zmq_connector_del(zc);
-  }
-  zc->bytes_received += ret;
-  if (zc->bytes_received < 14) {
-    //dbg("Waiting for the rest of the sig - got %u bytes\n",zc->bytes_received);
+  ret = pico_socket_read(z->sock, incoming, 10);
+  if (ret < 10) {
+    dbg("Received invalid signature\n");
+    connector_del(z);
     return;
   }
-
-  //dbg("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
-  zc->state = ST_RDY;
+  if (incoming[0] != 0xFF) {
+    dbg("Received invalid signature\n");
+    connector_del(z);
+    return;
+  }
+  dbg("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
+  z->state = ST_VERSION;
 }
  
-static void zmq_hs_version(struct zmq_connector *zc)
+static void hs_version(struct zmq_connector *z)
 {
   uint8_t incoming[20];
   int ret;
-  ret = pico_socket_read(zc->sock, incoming, 2);
+  ret = pico_socket_read(z->sock, incoming, 2);
   if (ret < 0) {
     dbg("Cannot exchange valid version information. Read returned -1\n");
-    zmq_connector_del(zc);
+    connector_del(z);
     return;
   }
   if (ret == 0)
@@ -197,77 +158,62 @@
 /* Version check?    
   if (incoming[0] != 3) {
     dbg("Version %d.x not supported by this publisher\n", incoming[0]);
-    zmq_connector_del(zc);
+    connector_del(z);
     return;
   }
   dbg("Subscriber is using version 3. Good!\n");
 */
   dbg("Subscriber is using version %d. Good!\n", incoming[0]);
   if (incoming[0] == 3)
-    zc->state = ST_GREETING;
+    z->state = ST_GREETING;
   else
-    zc->state = ST_RDY;
+    z->state = ST_RDY;
 }
  
-static void zmq_hs_greeting(struct zmq_connector *zc)
+static void hs_greeting(struct zmq_connector *z)
 {
   uint8_t incoming[64];
   int ret;
-  ret = pico_socket_read(zc->sock, incoming, 64);
+  ret = pico_socket_read(z->sock, incoming, 64);
   dbg("zmq_socket_read in greeting returned %d\n", ret);    
   if (ret == 0)
    return;  
   if (ret < 0) {
     dbg("Cannot retrieve valid greeting\n");
-    zmq_connector_del(zc);
+    connector_del(z);
     return;
   }
-  zc->state = ST_RDY;
-  zmq_check_state(zc->parent);
   dbg("Paired. Sending Ready.\n");
-  pico_socket_write(zc->sock, "READY   ",8);
+  z->state = ST_RDY;
+  pico_socket_write(z->sock, "READY   ",8);
 }
 
-static void zmq_hs_rdy(struct zmq_connector *zc)
+static void hs_rdy(struct zmq_connector *z)
 {
     int ret;
     uint8_t incoming[258];
-    if (zc->role == ROLE_SUBSCRIBER)
-      return;
-    ret = pico_socket_read(zc->sock, incoming, 258);
+    ret = pico_socket_read(z->sock, incoming, 258);
     dbg("Got %d bytes from subscriber whilst in rdy state.\n", ret);
 }
-
-static void zmq_hs_busy(struct zmq_connector *zc)
-{
-  int was_busy = 0;
-  if (zc->parent->state == ST_BUSY)
-    was_busy = 1;
-  zmq_check_state(zc->parent);
-  if (was_busy && (zc->parent->state == ST_RDY) && zc->parent->ready)
-    zc->parent->ready(zc->parent);
-}
  
-static void(*zmq_hs_cb[])(struct zmq_connector *) = {
+static void(*hs_cb[])(struct zmq_connector *) = {
     NULL,
-    zmq_hs_connected,
-    zmq_hs_signature,
-    zmq_hs_version,
-    zmq_hs_greeting,
-    zmq_hs_rdy,
-    zmq_hs_busy
+    hs_connected,
+    hs_signature,
+    hs_version,
+    hs_greeting,
+    hs_rdy
 };
-
-
+ 
 static void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
 {
   struct pico_ip4 orig;
   uint16_t port;
   char peer[30];
   struct zmq_connector *z_a, *zc;
-  ZMQ z = ZMTP(s);
+  struct zmq_socket *z = ZMTP(s);
   
-  /* Publisher. Accepting new subscribers */
+  /* Accepting new subscribers... */
   if (z) {
     if (ev & PICO_SOCK_EV_CONN) { 
       z_a = pico_zalloc(sizeof(struct zmq_socket));
@@ -276,13 +222,12 @@
       
       z_a->sock = pico_socket_accept(s, &orig, &port);
       pico_ipv4_to_string(peer, orig.addr);
-      dbg("tcp0mq> Connection requested by %s:%u.\n", peer, short_be(port));
-      if (z->state == ST_OPEN) {
-          dbg("tcp0mq> Accepted connection! New subscriber on sock %p.\n",z_a->sock);
-          zmq_connector_add(z, z_a);
-          z_a->role = ROLE_PUBLISHER;
+      dbg("tcp0mq> Connection requested by %s:%d.\n", peer, short_be(port));
+      if (z->state == ST_LISTEN) {
+          dbg("tcp0mq> Accepted connection! New subscriber.\n");
+          connector_add(z, z_a);
           z_a->state = ST_CONNECTED;
-          zmq_hs_connected(z_a);
+          hs_connected(z_a);
       } else {
           dbg("tcp0mq> Server busy, connection rejected\n");
           pico_socket_close(z_a->sock);
@@ -293,105 +238,59 @@
 
   zc = find_subscriber(s);
   if (!zc) {
-    dbg("Cannot find subscriber with socket %p, ev = %d!\n", s, ev);
-//    pico_socket_close(s);
+    dbg("Cannot find subscriber!\n");
     return;
   }
 
-  if ((ev & PICO_SOCK_EV_CONN) && zc->role == ROLE_SUBSCRIBER && zc->state == ST_OPEN)
-  {
-     zc->state = ST_CONNECTED;
-     zmq_hs_connected(zc);
-  }
-
 
   if (ev & PICO_SOCK_EV_RD) {
-    if (zmq_hs_cb[zc->state])
-      zmq_hs_cb[zc->state](zc);
-  }
-
-  if ((ev & PICO_SOCK_EV_WR) && zc->parent && (zc->parent->role == ROLE_PUBLISHER) && (zc->state == ST_BUSY)) {
-    if (zmq_hs_cb[zc->state])
-      zmq_hs_cb[zc->state](zc);
+    if (hs_cb[zc->state])
+      hs_cb[zc->state](zc);
   }
  
  
   if (ev & PICO_SOCK_EV_FIN) {
     dbg("tcp0mq> Connection closed.\n");
-    zmq_connector_del(zc);
+    connector_del(zc);
   }
  
   if (ev & PICO_SOCK_EV_ERR) {
     dbg("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
-    zmq_connector_del(zc);
+    connector_del(zc);
   }
  
   if (ev & PICO_SOCK_EV_CLOSE) {
     dbg("tcp0mq> event close\n");
-    zmq_connector_del(zc);
+    connector_del(zc);
   }
  
-}
-
-ZMQ zmq_subscriber(void (*cb)(ZMQ z))
-{
-  ZMQ z = pico_zalloc(sizeof(struct zmq_socket));
-  if (!z) {
-    pico_err = PICO_ERR_ENOMEM;
-    return NULL;
+  if (ev & PICO_SOCK_EV_WR) {
+  /* TODO: implement a counter to wake up parent when all subscribers are ready */
+  //  if (z->ready)
+  //    z->ready(z);
   }
-  z->state = ST_BUSY;
-  z->ready = cb;
-  z->role = ROLE_SUBSCRIBER;
-  pico_tree_insert(&zmq_sockets, z);
-  return z;
 }
 
-int zmq_connect(ZMQ z, char *address, uint16_t port) 
-{
-  struct pico_ip4 ip;
-  struct zmq_connector *z_c;
-  if (pico_string_to_ipv4(address, &ip.addr) < 0) {
-    dbg("FIXME!! I need to synchronize with the dns client to get to my publisher :(\n");
-    return -1;
-  }
-
-  z_c = pico_zalloc(sizeof(struct zmq_connector));
-  if (!z_c)
-    return -1;
-  z_c->role = ROLE_SUBSCRIBER;
-  z_c->state = ST_OPEN;
-  z_c->sock = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
-  if (!z_c->sock) {
-    pico_free(z_c);
-    return -1;
-  }
-  if (pico_socket_connect(z_c->sock, &ip, short_be(port)) < 0)
-    return -1;
-  zmq_connector_add(z, z_c);
-  return 0;
-}
-
-ZMQ zmq_publisher(uint16_t _port, void (*cb)(ZMQ z))
+ZMQ zmq_producer(uint16_t _port, void (*cb)(ZMQ z))
 {
   struct pico_socket *s;
   struct pico_ip4 inaddr_any = {0};
   uint16_t port = short_be(_port);
-  ZMQ z = NULL;
+  struct zmq_socket *z = NULL;
   s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
   if (!s)
     return NULL;
  
-  dbg("zmq_publisher: BIND\n");
+  dbg("zmq_producer: BIND\n");
   if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
-    dbg("zmq publisher: BIND failed\n");
+    dbg("zmq producer: BIND failed\n");
     return NULL;
   }
-  if (pico_socket_listen(s, 2) != 0) {
-    dbg("zmq publisher: LISTEN failed\n");
+  if (pico_socket_listen(s, 40) != 0) {
+    dbg("zmq producer: LISTEN failed\n");
     return NULL;
   }
-  dbg("zmq_publisher: Active and bound to local port %d\n", short_be(port));
+  dbg("zmq_producer: Active and bound to local port %d\n", short_be(port));
 
   z = pico_zalloc(sizeof(struct zmq_socket));
   if (!z) {
@@ -400,33 +299,29 @@
     return NULL;
   }
   z->sock = s;
-  z->state = ST_OPEN;
+  z->state = ST_LISTEN;
   z->ready = cb;
-  z->role = ROLE_PUBLISHER;
   z->subs = NULL;
   pico_tree_insert(&zmq_sockets, z);
-  dbg("zmq publisher created.\n");
+  dbg("zmq producer created.\n");
   return z;
 }
 
-int zmq_send(ZMQ z, char *txt, int len)
+int zmq_send(struct zmq_socket *z, char *txt, int len)
 {
     struct zmq_msg *msg;
     struct zmq_connector *c = z->subs;
     int ret = 0;
 
     if (!c) 
-    {
-        dbg("no subscribers, bailing out\n");
-        return 0; /* Need at least one subscriber */
-    }
+      return 0; /* Need at least one subscriber */
+  
     msg = pico_zalloc(len + 2);
     msg->flags = 4;
     msg->len = (uint8_t) len;
     memcpy(msg->txt, txt, len);
 
     while (c) {
-      dbg("write to %u\n",c->state);
       if ((ST_RDY == c->state) && (pico_socket_write(c->sock, msg, len + 2) > 0))
         ret++;
       c = c->next;
@@ -434,39 +329,3 @@
     pico_free(msg);
     return ret;
 }
-
-int zmq_recv(ZMQ z, char *txt)
-{
-  int ret;
-  struct zmq_msg msg;
-  struct zmq_connector *nxt, *c = z->subs;
-  if (z->state != ST_RDY)
-    return 0;
-  while (c) {
-    nxt = c->next;
-    ret = pico_socket_read(c->sock, &msg, 2);
-    if (ret < 0) {
-      dbg("Error reading!\n");
-      zmq_connector_del(c);
-    } else if (ret < 2) {
-      c->state = ST_BUSY;
-    } else {
-      return pico_socket_read(c->sock, txt, msg.len);
-    }
-    c = nxt;
-  }
-  zmq_check_state(z);
-  return 0;
-}
-
-void zmq_close(ZMQ z)
-{
-  struct zmq_connector *nxt, *c = z->subs;
-  while(c) {
-    nxt = c->next;
-    zmq_connector_del(c);
-    c = nxt;
-  }
-  pico_socket_close(z->sock);
-  pico_free(z); 
-}