ZeroMQ publisher demo application running on LPC1768 and PicoTCP. GPL v2

Dependencies:   PicoTCP lpc1768-picotcp-eth-polling mbed-rtos mbed

Committer:
daniele
Date:
Sat Jun 22 14:51:37 2013 +0000
Revision:
0:c3b9517c3c53
Child:
1:907f8c9fa45d
First version of the demo application

Who changed what in which revision?

UserRevisionLine numberNew contents of line
daniele 0:c3b9517c3c53 1 /* PicoTCP ZeroMQ Test Publisher
daniele 0:c3b9517c3c53 2 * Copyright 2013 - Daniele Lacamera
daniele 0:c3b9517c3c53 3 *
daniele 0:c3b9517c3c53 4 * GPL v2
daniele 0:c3b9517c3c53 5 */
daniele 0:c3b9517c3c53 6
daniele 0:c3b9517c3c53 7 #include "mbed.h"
daniele 0:c3b9517c3c53 8 #include "EthernetInterface.h"
daniele 0:c3b9517c3c53 9
daniele 0:c3b9517c3c53 10 DigitalOut myled(LED1);
daniele 0:c3b9517c3c53 11 static struct pico_socket *zmq_sock = NULL;
daniele 0:c3b9517c3c53 12
daniele 0:c3b9517c3c53 13 enum zmq_hshake_state {
daniele 0:c3b9517c3c53 14 ST_LISTEN = 0,
daniele 0:c3b9517c3c53 15 ST_CONNECTED,
daniele 0:c3b9517c3c53 16 ST_SIGNATURE,
daniele 0:c3b9517c3c53 17 ST_VERSION,
daniele 0:c3b9517c3c53 18 ST_GREETING,
daniele 0:c3b9517c3c53 19 ST_RDY
daniele 0:c3b9517c3c53 20 } Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 21
daniele 0:c3b9517c3c53 22 struct zmq_msg {
daniele 0:c3b9517c3c53 23 uint8_t flags;
daniele 0:c3b9517c3c53 24 uint8_t len;
daniele 0:c3b9517c3c53 25 char txt[0];
daniele 0:c3b9517c3c53 26 };
daniele 0:c3b9517c3c53 27
daniele 0:c3b9517c3c53 28
daniele 0:c3b9517c3c53 29 void zmq_send(struct pico_socket *s, char *txt, int len)
daniele 0:c3b9517c3c53 30 {
daniele 0:c3b9517c3c53 31 struct zmq_msg msg;
daniele 0:c3b9517c3c53 32 msg.flags = 0;
daniele 0:c3b9517c3c53 33 msg.len = (uint8_t) len;
daniele 0:c3b9517c3c53 34 memcpy(msg.txt, txt, len);
daniele 0:c3b9517c3c53 35 pico_socket_write(s, &msg, len + 2);
daniele 0:c3b9517c3c53 36 }
daniele 0:c3b9517c3c53 37
daniele 0:c3b9517c3c53 38
daniele 0:c3b9517c3c53 39 static void hs_connected(struct pico_socket *s)
daniele 0:c3b9517c3c53 40 {
daniele 0:c3b9517c3c53 41 uint8_t my_signature[10] = {0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f};
daniele 0:c3b9517c3c53 42 pico_socket_write(s, my_signature, 10);
daniele 0:c3b9517c3c53 43 Handshake_state = ST_SIGNATURE;
daniele 0:c3b9517c3c53 44 }
daniele 0:c3b9517c3c53 45
daniele 0:c3b9517c3c53 46 static void hs_signature(struct pico_socket *s)
daniele 0:c3b9517c3c53 47 {
daniele 0:c3b9517c3c53 48 uint8_t incoming[20];
daniele 0:c3b9517c3c53 49 int ret;
daniele 0:c3b9517c3c53 50 uint8_t my_ver[2] = {3u, 0};
daniele 0:c3b9517c3c53 51 ret = pico_socket_read(s, incoming, 10);
daniele 0:c3b9517c3c53 52 if (ret < 10) {
daniele 0:c3b9517c3c53 53 printf("Received invalid signature\n");
daniele 0:c3b9517c3c53 54 pico_socket_close(s);
daniele 0:c3b9517c3c53 55 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 56 return;
daniele 0:c3b9517c3c53 57 }
daniele 0:c3b9517c3c53 58 if (incoming[0] != 0xFF) {
daniele 0:c3b9517c3c53 59 printf("Received invalid signature\n");
daniele 0:c3b9517c3c53 60 pico_socket_close(s);
daniele 0:c3b9517c3c53 61 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 62 return;
daniele 0:c3b9517c3c53 63 }
daniele 0:c3b9517c3c53 64 printf("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]);
daniele 0:c3b9517c3c53 65 pico_socket_write(s, my_ver, 2);
daniele 0:c3b9517c3c53 66 Handshake_state = ST_VERSION;
daniele 0:c3b9517c3c53 67 }
daniele 0:c3b9517c3c53 68
daniele 0:c3b9517c3c53 69 static void hs_version(struct pico_socket *s)
daniele 0:c3b9517c3c53 70 {
daniele 0:c3b9517c3c53 71 uint8_t incoming[20];
daniele 0:c3b9517c3c53 72 int ret;
daniele 0:c3b9517c3c53 73 uint8_t my_greeting[53] = {'N','U','L','L', 0};
daniele 0:c3b9517c3c53 74 ret = pico_socket_read(s, incoming, 2);
daniele 0:c3b9517c3c53 75 if (ret < 1) {
daniele 0:c3b9517c3c53 76 printf("Cannot exchange valid version information. Read returned %d, expected at least one byte.\n", ret);
daniele 0:c3b9517c3c53 77 pico_socket_close(s);
daniele 0:c3b9517c3c53 78 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 79 return;
daniele 0:c3b9517c3c53 80 }
daniele 0:c3b9517c3c53 81 if (incoming[0] != 3) {
daniele 0:c3b9517c3c53 82 printf("Version %d.x not supported by this publisher\n", incoming[0]);
daniele 0:c3b9517c3c53 83 pico_socket_close(s);
daniele 0:c3b9517c3c53 84 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 85 return;
daniele 0:c3b9517c3c53 86 }
daniele 0:c3b9517c3c53 87 pico_socket_write(s, my_greeting, 53);
daniele 0:c3b9517c3c53 88 Handshake_state = ST_GREETING;
daniele 0:c3b9517c3c53 89 }
daniele 0:c3b9517c3c53 90
daniele 0:c3b9517c3c53 91 static void hs_greeting(struct pico_socket *s)
daniele 0:c3b9517c3c53 92 {
daniele 0:c3b9517c3c53 93 uint8_t incoming[53];
daniele 0:c3b9517c3c53 94 int ret;
daniele 0:c3b9517c3c53 95 uint8_t my_rdy[8] = {'R','E','A','D','Y',' ',' ',' '};
daniele 0:c3b9517c3c53 96 ret = pico_socket_read(s, incoming, 53);
daniele 0:c3b9517c3c53 97 if (ret < 53) {
daniele 0:c3b9517c3c53 98 printf("Cannot retrieve valid greeting\n");
daniele 0:c3b9517c3c53 99 pico_socket_close(s);
daniele 0:c3b9517c3c53 100 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 101 return;
daniele 0:c3b9517c3c53 102 }
daniele 0:c3b9517c3c53 103 zmq_send(s, "READY ", 8);
daniele 0:c3b9517c3c53 104 Handshake_state = ST_RDY;
daniele 0:c3b9517c3c53 105 }
daniele 0:c3b9517c3c53 106
daniele 0:c3b9517c3c53 107 static void hs_rdy(struct pico_socket *s)
daniele 0:c3b9517c3c53 108 {
daniele 0:c3b9517c3c53 109 int ret;
daniele 0:c3b9517c3c53 110 uint8_t incoming[258];
daniele 0:c3b9517c3c53 111 pico_socket_read(s, incoming, 258);
daniele 0:c3b9517c3c53 112 printf("Got %d bytes from subscriber whilst in rdy state.\n", ret);
daniele 0:c3b9517c3c53 113 }
daniele 0:c3b9517c3c53 114
daniele 0:c3b9517c3c53 115 static void(*hs_cb[])(struct pico_socket *) = {
daniele 0:c3b9517c3c53 116 NULL,
daniele 0:c3b9517c3c53 117 hs_connected,
daniele 0:c3b9517c3c53 118 hs_signature,
daniele 0:c3b9517c3c53 119 hs_version,
daniele 0:c3b9517c3c53 120 hs_greeting,
daniele 0:c3b9517c3c53 121 hs_rdy
daniele 0:c3b9517c3c53 122 };
daniele 0:c3b9517c3c53 123
daniele 0:c3b9517c3c53 124 void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
daniele 0:c3b9517c3c53 125 {
daniele 0:c3b9517c3c53 126 struct pico_ip4 orig;
daniele 0:c3b9517c3c53 127 uint16_t port;
daniele 0:c3b9517c3c53 128 char peer[30];
daniele 0:c3b9517c3c53 129
daniele 0:c3b9517c3c53 130 if (ev & PICO_SOCK_EV_RD) {
daniele 0:c3b9517c3c53 131 if (hs_cb[Handshake_state])
daniele 0:c3b9517c3c53 132 hs_cb[Handshake_state](s);
daniele 0:c3b9517c3c53 133 }
daniele 0:c3b9517c3c53 134
daniele 0:c3b9517c3c53 135 if (ev & PICO_SOCK_EV_CONN) {
daniele 0:c3b9517c3c53 136 zmq_sock = pico_socket_accept(s, &orig, &port);
daniele 0:c3b9517c3c53 137 pico_ipv4_to_string(peer, orig.addr);
daniele 0:c3b9517c3c53 138 printf("tcp0mq> Connection established with %s:%d.\n", peer, short_be(port));
daniele 0:c3b9517c3c53 139 Handshake_state = ST_CONNECTED;
daniele 0:c3b9517c3c53 140 }
daniele 0:c3b9517c3c53 141
daniele 0:c3b9517c3c53 142 if (ev & PICO_SOCK_EV_FIN) {
daniele 0:c3b9517c3c53 143 printf("tcp0mq> Connection closed.\n");
daniele 0:c3b9517c3c53 144 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 145 }
daniele 0:c3b9517c3c53 146
daniele 0:c3b9517c3c53 147 if (ev & PICO_SOCK_EV_ERR) {
daniele 0:c3b9517c3c53 148 printf("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
daniele 0:c3b9517c3c53 149 printf("tcp0mq> Connection closed.\n");
daniele 0:c3b9517c3c53 150 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 151 }
daniele 0:c3b9517c3c53 152
daniele 0:c3b9517c3c53 153 if (ev & PICO_SOCK_EV_CLOSE) {
daniele 0:c3b9517c3c53 154 printf("tcp0mq> event close\n");
daniele 0:c3b9517c3c53 155 pico_socket_close(s);
daniele 0:c3b9517c3c53 156 Handshake_state = ST_LISTEN;
daniele 0:c3b9517c3c53 157 }
daniele 0:c3b9517c3c53 158
daniele 0:c3b9517c3c53 159 if (ev & PICO_SOCK_EV_WR) {
daniele 0:c3b9517c3c53 160 /* TODO: manage pending data */
daniele 0:c3b9517c3c53 161 }
daniele 0:c3b9517c3c53 162 }
daniele 0:c3b9517c3c53 163
daniele 0:c3b9517c3c53 164
daniele 0:c3b9517c3c53 165
daniele 0:c3b9517c3c53 166 int main() {
daniele 0:c3b9517c3c53 167 int counter = 0;
daniele 0:c3b9517c3c53 168 EthernetInterface eth;
daniele 0:c3b9517c3c53 169 eth.init(); //Use DHCP
daniele 0:c3b9517c3c53 170 eth.connect();
daniele 0:c3b9517c3c53 171 pico_stack_init();
daniele 0:c3b9517c3c53 172
daniele 0:c3b9517c3c53 173 struct pico_socket *s;
daniele 0:c3b9517c3c53 174 struct pico_ip4 server_addr;
daniele 0:c3b9517c3c53 175 uint16_t port = short_be(9000);
daniele 0:c3b9517c3c53 176 struct pico_ip4 inaddr_any = {0};
daniele 0:c3b9517c3c53 177
daniele 0:c3b9517c3c53 178 s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
daniele 0:c3b9517c3c53 179 if (!s)
daniele 0:c3b9517c3c53 180 while(1);;
daniele 0:c3b9517c3c53 181
daniele 0:c3b9517c3c53 182 printf("tcp0mq> BIND\n");
daniele 0:c3b9517c3c53 183 if (pico_socket_bind(s, &inaddr_any, &port)!= 0) {
daniele 0:c3b9517c3c53 184 printf("tcp0mq> BIND failed because %s\n", strerror(pico_err));
daniele 0:c3b9517c3c53 185 while(1);;
daniele 0:c3b9517c3c53 186 }
daniele 0:c3b9517c3c53 187
daniele 0:c3b9517c3c53 188 printf("tcp0mq> LISTEN\n");
daniele 0:c3b9517c3c53 189 if (pico_socket_listen(s, 40) != 0)
daniele 0:c3b9517c3c53 190 while(1);;
daniele 0:c3b9517c3c53 191 printf("tcp0mq> listening port %u ...\n",short_be(port));
daniele 0:c3b9517c3c53 192
daniele 0:c3b9517c3c53 193 while(1) {
daniele 0:c3b9517c3c53 194 pico_stack_tick();
daniele 0:c3b9517c3c53 195 wait(0.001);
daniele 0:c3b9517c3c53 196 if((counter++ > 500) && (Handshake_state == ST_RDY)) {
daniele 0:c3b9517c3c53 197 zmq_send(zmq_sock, "HELLO WORLD", 10);
daniele 0:c3b9517c3c53 198 counter = 0;
daniele 0:c3b9517c3c53 199 myled = !myled;
daniele 0:c3b9517c3c53 200 }
daniele 0:c3b9517c3c53 201 }
daniele 0:c3b9517c3c53 202 }