ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2

Dependencies:   PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed

Revision:
0:c3b9517c3c53
Child:
1:907f8c9fa45d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main.cpp	Sat Jun 22 14:51:37 2013 +0000
@@ -0,0 +1,202 @@
+/* PicoTCP ZeroMQ Test Publisher 
+ * Copyright 2013 - Daniele Lacamera
+ * 
+ * GPL v2
+ */
+
+#include "mbed.h"
+#include "EthernetInterface.h"
+
+DigitalOut myled(LED1);
+static struct pico_socket *zmq_sock = NULL;
+
+enum zmq_hshake_state {
+  ST_LISTEN = 0,
+  ST_CONNECTED,
+  ST_SIGNATURE,
+  ST_VERSION,
+  ST_GREETING,
+  ST_RDY
+} Handshake_state = ST_LISTEN;
+
+struct zmq_msg {
+    uint8_t flags;
+    uint8_t len;
+    char    txt[0];
+};
+
+
+void zmq_send(struct pico_socket *s, char *txt, int len)
+{
+    struct zmq_msg msg;
+    msg.flags = 0;
+    msg.len = (uint8_t) len;
+    memcpy(msg.txt, txt, len);
+    pico_socket_write(s, &msg, len + 2);
+}
+
+
+static void hs_connected(struct pico_socket *s)
+{
+  uint8_t my_signature[10] =  {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
+  pico_socket_write(s, my_signature, 10);
+  Handshake_state = ST_SIGNATURE;
+}
+
+static void hs_signature(struct pico_socket *s)
+{
+  uint8_t incoming[20];
+  int ret;
+  uint8_t my_ver[2] = {3u, 0};
+  ret = pico_socket_read(s, incoming, 10);
+  if (ret < 10) {
+    printf("Received invalid signature\n");
+    pico_socket_close(s);
+    Handshake_state = ST_LISTEN;
+    return;
+  }
+  if (incoming[0] != 0xFF) {
+    printf("Received invalid signature\n");
+    pico_socket_close(s);
+    Handshake_state = ST_LISTEN;
+    return;
+  }
+  printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
+  pico_socket_write(s, my_ver, 2);
+  Handshake_state = ST_VERSION;
+}
+
+static void hs_version(struct pico_socket *s)
+{
+  uint8_t incoming[20];
+  int ret;
+  uint8_t my_greeting[53] = {'N','U','L','L', 0};
+  ret = pico_socket_read(s, incoming, 2);
+  if (ret < 1) {
+    printf("Cannot exchange valid version information. Read returned %d, expected at least one byte.\n", ret);
+    pico_socket_close(s);
+    Handshake_state = ST_LISTEN;
+    return;
+  }
+  if (incoming[0] != 3) {
+    printf("Version %d.x not supported by this publisher\n", incoming[0]);
+    pico_socket_close(s);
+    Handshake_state = ST_LISTEN;
+    return;
+  }
+  pico_socket_write(s, my_greeting, 53);
+  Handshake_state = ST_GREETING;
+}
+
+static void hs_greeting(struct pico_socket *s)
+{
+  uint8_t incoming[53];
+  int ret;
+  uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '};
+  ret = pico_socket_read(s, incoming, 53);
+  if (ret < 53) {
+    printf("Cannot retrieve valid greeting\n");
+    pico_socket_close(s);
+    Handshake_state = ST_LISTEN;
+    return;
+  }
+  zmq_send(s, "READY   ", 8);
+  Handshake_state = ST_RDY;
+}
+
+static void hs_rdy(struct pico_socket *s)
+{
+    int ret;
+    uint8_t incoming[258];
+    pico_socket_read(s, incoming, 258);
+    printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
+}
+
+static void(*hs_cb[])(struct pico_socket *) = {
+    NULL,
+    hs_connected,
+    hs_signature,
+    hs_version,
+    hs_greeting,
+    hs_rdy
+};
+
+void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
+{
+  struct pico_ip4 orig;
+  uint16_t port;
+  char peer[30];
+
+  if (ev & PICO_SOCK_EV_RD) {
+    if (hs_cb[Handshake_state])
+      hs_cb[Handshake_state](s);
+  }
+
+  if (ev & PICO_SOCK_EV_CONN) { 
+    zmq_sock = pico_socket_accept(s, &orig, &port);
+    pico_ipv4_to_string(peer, orig.addr);
+    printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port));
+    Handshake_state = ST_CONNECTED;
+  }
+
+  if (ev & PICO_SOCK_EV_FIN) {
+    printf("tcp0mq> Connection closed.\n");
+    Handshake_state = ST_LISTEN;
+  }
+
+  if (ev & PICO_SOCK_EV_ERR) {
+    printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
+    printf("tcp0mq> Connection closed.\n");
+    Handshake_state = ST_LISTEN;
+  }
+
+  if (ev & PICO_SOCK_EV_CLOSE) {
+    printf("tcp0mq> event close\n");
+    pico_socket_close(s);
+    Handshake_state = ST_LISTEN;
+  }
+
+  if (ev & PICO_SOCK_EV_WR) {
+    /* TODO: manage pending data */
+  }
+}
+
+
+
+int main() {
+  int counter = 0;
+  EthernetInterface eth;
+  eth.init(); //Use DHCP
+  eth.connect();
+  pico_stack_init();
+  
+  struct pico_socket *s;
+  struct pico_ip4 server_addr;
+  uint16_t port = short_be(9000);
+  struct pico_ip4 inaddr_any = {0};
+
+  s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
+  if (!s)
+    while(1);;
+  
+  printf("tcp0mq> BIND\n");
+  if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
+    printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
+    while(1);;
+  }
+
+  printf("tcp0mq> LISTEN\n");
+  if (pico_socket_listen(s, 40) != 0)
+     while(1);;
+  printf("tcp0mq> listening port %u ...\n",short_be(port));
+
+  while(1) {
+    pico_stack_tick();
+    wait(0.001);
+    if((counter++ > 500) && (Handshake_state == ST_RDY)) {
+        zmq_send(zmq_sock, "HELLO WORLD", 10);
+        counter = 0;
+        myled = !myled;
+    }
+  }
+}