Free (GPLv2) TCP/IP stack developed by TASS Belgium

Dependents:   lpc1768-picotcp-demo ZeroMQ_PicoTCP_Publisher_demo TCPSocket_HelloWorld_PicoTCP Pico_TCP_UDP_Test ... more

PicoTCP. Copyright (c) 2013 TASS Belgium NV.

Released under the GNU General Public License, version 2.

Different licensing models may exist, at the sole discretion of the Copyright holders.

Official homepage: http://www.picotcp.com

Bug tracker: https://github.com/tass-belgium/picotcp/issues

Development steps:

  • initial integration with mbed RTOS
  • generic mbed Ethernet driver
  • high performance NXP LPC1768 specific Ethernet driver
  • Multi-threading support for mbed RTOS
  • Berkeley sockets and integration with the New Socket API
  • Fork of the apps running on top of the New Socket API
  • Scheduling optimizations
  • Debugging/benchmarking/testing

Demo application (measuring TCP sender performance):

Import programlpc1768-picotcp-demo

A PicoTCP demo app testing the ethernet throughput on the lpc1768 mbed board.

Committer:
TASS Belgium NV
Date:
Mon Dec 16 11:25:54 2013 +0100
Revision:
131:4758606c9316
Parent:
122:5b1e9de8bf7f
Child:
135:a064a384eae6
Syncronized with master branch

Who changed what in which revision?

UserRevisionLine numberNew contents of line
tass 68:0847e35d08a6 1 /*********************************************************************
TASS Belgium NV 131:4758606c9316 2 PicoTCP. Copyright (c) 2012 TASS Belgium NV. Some rights reserved.
TASS Belgium NV 131:4758606c9316 3 See LICENSE and COPYING for usage.
tass 68:0847e35d08a6 4
TASS Belgium NV 131:4758606c9316 5 Authors: Daniele Lacamera
TASS Belgium NV 131:4758606c9316 6 *********************************************************************/
tass 68:0847e35d08a6 7
tass 68:0847e35d08a6 8 #include "pico_stack.h"
tass 68:0847e35d08a6 9 #include "pico_config.h"
tass 68:0847e35d08a6 10 #include "pico_ipv4.h"
tass 68:0847e35d08a6 11 #include "pico_socket.h"
tass 68:0847e35d08a6 12 #include "pico_zmq.h"
tass 68:0847e35d08a6 13
tass 68:0847e35d08a6 14 #define MY_VERSION 1u
tass 68:0847e35d08a6 15
TASS Belgium NV 131:4758606c9316 16
tass 68:0847e35d08a6 17 enum zmq_state {
TASS Belgium NV 131:4758606c9316 18 ST_OPEN = 0,
TASS Belgium NV 131:4758606c9316 19 ST_CONNECTED,
TASS Belgium NV 131:4758606c9316 20 ST_SIGNATURE,
TASS Belgium NV 131:4758606c9316 21 ST_VERSION,
TASS Belgium NV 131:4758606c9316 22 ST_GREETING,
TASS Belgium NV 131:4758606c9316 23 ST_RDY,
TASS Belgium NV 131:4758606c9316 24 ST_BUSY
tass 68:0847e35d08a6 25 };
tass 68:0847e35d08a6 26
tass 68:0847e35d08a6 27 enum zmq_role {
TASS Belgium NV 131:4758606c9316 28 ROLE_NONE = 0,
TASS Belgium NV 131:4758606c9316 29 ROLE_PUBLISHER,
TASS Belgium NV 131:4758606c9316 30 ROLE_SUBSCRIBER
tass 68:0847e35d08a6 31 };
tass 68:0847e35d08a6 32
tass 68:0847e35d08a6 33 struct __attribute__((packed)) zmq_msg {
TASS Belgium NV 131:4758606c9316 34 uint8_t flags;
tass 68:0847e35d08a6 35 uint8_t len;
TASS Belgium NV 131:4758606c9316 36 char txt[];
tass 68:0847e35d08a6 37 };
tass 68:0847e35d08a6 38
tass 68:0847e35d08a6 39 struct zmq_socket;
tass 68:0847e35d08a6 40
tass 68:0847e35d08a6 41 struct zmq_connector {
TASS Belgium NV 131:4758606c9316 42 struct pico_socket *sock;
TASS Belgium NV 131:4758606c9316 43 enum zmq_state state;
TASS Belgium NV 131:4758606c9316 44 ZMQ parent;
TASS Belgium NV 131:4758606c9316 45 enum zmq_role role;
TASS Belgium NV 131:4758606c9316 46 uint8_t bytes_received;
TASS Belgium NV 131:4758606c9316 47 struct zmq_connector *next;
tass 68:0847e35d08a6 48 };
tass 68:0847e35d08a6 49
tass 68:0847e35d08a6 50 struct zmq_socket {
TASS Belgium NV 131:4758606c9316 51 struct pico_socket *sock;
TASS Belgium NV 131:4758606c9316 52 void (*ready)(ZMQ z);
TASS Belgium NV 131:4758606c9316 53 enum zmq_state state;
TASS Belgium NV 131:4758606c9316 54 struct zmq_connector *subs;
TASS Belgium NV 131:4758606c9316 55 enum zmq_role role;
tass 68:0847e35d08a6 56 };
tass 68:0847e35d08a6 57
tass 68:0847e35d08a6 58 static int zmq_socket_cmp(void *ka, void *kb)
tass 68:0847e35d08a6 59 {
TASS Belgium NV 131:4758606c9316 60 ZMQ a = ka;
TASS Belgium NV 131:4758606c9316 61 ZMQ b = kb;
TASS Belgium NV 131:4758606c9316 62 if (a->sock < b->sock)
TASS Belgium NV 131:4758606c9316 63 return -1;
TASS Belgium NV 131:4758606c9316 64
TASS Belgium NV 131:4758606c9316 65 if (b->sock < a->sock)
TASS Belgium NV 131:4758606c9316 66 return 1;
TASS Belgium NV 131:4758606c9316 67
TASS Belgium NV 131:4758606c9316 68 return 0;
tass 68:0847e35d08a6 69 }
tass 68:0847e35d08a6 70 PICO_TREE_DECLARE(zmq_sockets, zmq_socket_cmp);
tass 68:0847e35d08a6 71
tass 68:0847e35d08a6 72 static inline ZMQ ZMTP(struct pico_socket *s)
tass 68:0847e35d08a6 73 {
TASS Belgium NV 131:4758606c9316 74 struct zmq_socket tst = {
TASS Belgium NV 131:4758606c9316 75 .sock = s
TASS Belgium NV 131:4758606c9316 76 };
TASS Belgium NV 131:4758606c9316 77 return (pico_tree_findKey(&zmq_sockets, &tst));
tass 68:0847e35d08a6 78 }
tass 68:0847e35d08a6 79
tass 68:0847e35d08a6 80 static inline struct zmq_connector *find_subscriber(struct pico_socket *s)
tass 68:0847e35d08a6 81 {
TASS Belgium NV 131:4758606c9316 82 ZMQ search;
TASS Belgium NV 131:4758606c9316 83 struct pico_tree_node *idx;
TASS Belgium NV 131:4758606c9316 84 struct zmq_connector *el;
TASS Belgium NV 131:4758606c9316 85 pico_tree_foreach(idx, &zmq_sockets) {
TASS Belgium NV 131:4758606c9316 86 search = idx->keyValue;
TASS Belgium NV 131:4758606c9316 87 el = search->subs;
TASS Belgium NV 131:4758606c9316 88 while(el) {
TASS Belgium NV 131:4758606c9316 89 if (el->sock == s)
TASS Belgium NV 131:4758606c9316 90 return el;
TASS Belgium NV 131:4758606c9316 91
TASS Belgium NV 131:4758606c9316 92 el = el->next;
TASS Belgium NV 131:4758606c9316 93 }
tass 68:0847e35d08a6 94 }
TASS Belgium NV 131:4758606c9316 95 return NULL;
tass 68:0847e35d08a6 96 }
tass 68:0847e35d08a6 97
tass 68:0847e35d08a6 98
tass 68:0847e35d08a6 99 static void zmq_connector_add(ZMQ z, struct zmq_connector *zc)
tass 68:0847e35d08a6 100 {
TASS Belgium NV 131:4758606c9316 101 zc->next = z->subs;
TASS Belgium NV 131:4758606c9316 102 z->subs = zc;
TASS Belgium NV 131:4758606c9316 103 zc->parent = z;
TASS Belgium NV 131:4758606c9316 104 dbg("Added connector %p, sock is %p\n", zc, zc->sock);
tass 68:0847e35d08a6 105 }
tass 68:0847e35d08a6 106
tass 68:0847e35d08a6 107 static void zmq_connector_del(struct zmq_connector *zc)
tass 68:0847e35d08a6 108 {
TASS Belgium NV 131:4758606c9316 109 ZMQ z = zc->parent;
TASS Belgium NV 131:4758606c9316 110 if(z) {
TASS Belgium NV 131:4758606c9316 111 struct zmq_connector *el = z->subs, *prev = NULL; /* el = pointer to linked list */
TASS Belgium NV 131:4758606c9316 112 while(el) {
TASS Belgium NV 131:4758606c9316 113 if (el == zc) { /* did we find the connector that we want to delete? */
TASS Belgium NV 131:4758606c9316 114 if (prev) /* was there a previous list item? */
TASS Belgium NV 131:4758606c9316 115 prev->next = zc->next; /* link the linked list again */
TASS Belgium NV 131:4758606c9316 116 else
TASS Belgium NV 131:4758606c9316 117 z->subs = zc->next; /* we were at the beginning of the list */
TASS Belgium NV 131:4758606c9316 118
TASS Belgium NV 131:4758606c9316 119 break;
TASS Belgium NV 131:4758606c9316 120 }
TASS Belgium NV 131:4758606c9316 121
TASS Belgium NV 131:4758606c9316 122 prev = el;
TASS Belgium NV 131:4758606c9316 123 el = el->next;
TASS Belgium NV 131:4758606c9316 124 }
tass 68:0847e35d08a6 125 }
TASS Belgium NV 131:4758606c9316 126
TASS Belgium NV 131:4758606c9316 127 pico_socket_close(zc->sock);
TASS Belgium NV 131:4758606c9316 128 pico_free(zc);
tass 68:0847e35d08a6 129 }
tass 68:0847e35d08a6 130
TASS Belgium NV 131:4758606c9316 131 static void zmq_check_state(ZMQ z)
tass 68:0847e35d08a6 132 {
TASS Belgium NV 131:4758606c9316 133 struct zmq_connector *c = z->subs;
TASS Belgium NV 131:4758606c9316 134 enum zmq_state default_state, option_state;
TASS Belgium NV 131:4758606c9316 135 if ((z->state != ST_RDY) && (z->state != ST_BUSY))
TASS Belgium NV 131:4758606c9316 136 return;
TASS Belgium NV 131:4758606c9316 137
TASS Belgium NV 131:4758606c9316 138 if (z->role == ROLE_SUBSCRIBER) {
TASS Belgium NV 131:4758606c9316 139 default_state = ST_RDY;
TASS Belgium NV 131:4758606c9316 140 option_state = ST_BUSY;
TASS Belgium NV 131:4758606c9316 141 } else {
TASS Belgium NV 131:4758606c9316 142 default_state = ST_BUSY;
TASS Belgium NV 131:4758606c9316 143 option_state = ST_RDY;
tass 68:0847e35d08a6 144 }
TASS Belgium NV 131:4758606c9316 145
TASS Belgium NV 131:4758606c9316 146 z->state = default_state;
TASS Belgium NV 131:4758606c9316 147 while(c) {
TASS Belgium NV 131:4758606c9316 148 if (c->state == option_state) {
TASS Belgium NV 131:4758606c9316 149 z->state = option_state;
TASS Belgium NV 131:4758606c9316 150 return;
TASS Belgium NV 131:4758606c9316 151 }
TASS Belgium NV 131:4758606c9316 152
TASS Belgium NV 131:4758606c9316 153 c = c->next;
TASS Belgium NV 131:4758606c9316 154 }
tass 68:0847e35d08a6 155 }
tass 68:0847e35d08a6 156
tass 68:0847e35d08a6 157
tass 68:0847e35d08a6 158 static void zmq_hs_connected(struct zmq_connector *z)
tass 68:0847e35d08a6 159 {
TASS Belgium NV 131:4758606c9316 160 /* v2 signature */
TASS Belgium NV 131:4758606c9316 161 uint8_t my_signature[14] = {
TASS Belgium NV 131:4758606c9316 162 0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f, 1, 1, 0, 0
TASS Belgium NV 131:4758606c9316 163 };
tass 68:0847e35d08a6 164
TASS Belgium NV 131:4758606c9316 165 /* uint8_t my_ver[2] = {MY_VERSION, 0}; */
TASS Belgium NV 131:4758606c9316 166 /* uint8_t my_greeting[52] = {'N','U','L','L', 0}; */
tass 68:0847e35d08a6 167
TASS Belgium NV 131:4758606c9316 168 pico_socket_write(z->sock, my_signature, 14);
TASS Belgium NV 131:4758606c9316 169 /* pico_socket_write(z->sock, my_ver, 2); */
tass 68:0847e35d08a6 170
TASS Belgium NV 131:4758606c9316 171 /* if (MY_VERSION > 2) */
TASS Belgium NV 131:4758606c9316 172 /* pico_socket_write(z->sock, my_greeting, 52); */
tass 68:0847e35d08a6 173
TASS Belgium NV 131:4758606c9316 174 z->state = ST_SIGNATURE;
TASS Belgium NV 131:4758606c9316 175 /* z->state = ST_RDY; */
tass 68:0847e35d08a6 176 }
TASS Belgium NV 131:4758606c9316 177
tass 68:0847e35d08a6 178 static void zmq_hs_signature(struct zmq_connector *zc)
tass 68:0847e35d08a6 179 {
TASS Belgium NV 131:4758606c9316 180 uint8_t incoming[20];
TASS Belgium NV 131:4758606c9316 181 int ret;
TASS Belgium NV 131:4758606c9316 182
TASS Belgium NV 131:4758606c9316 183 ret = pico_socket_read(zc->sock, incoming, 14);
TASS Belgium NV 131:4758606c9316 184 if (zc->bytes_received == 0 && ret > 0 && incoming[0] != 0xFF) {
TASS Belgium NV 131:4758606c9316 185 /* dbg("Received invalid signature: [0]!=0xFF\n"); */
TASS Belgium NV 131:4758606c9316 186 zmq_connector_del(zc);
TASS Belgium NV 131:4758606c9316 187 }
tass 68:0847e35d08a6 188
TASS Belgium NV 131:4758606c9316 189 zc->bytes_received = (uint8_t)(zc->bytes_received + ret);
TASS Belgium NV 131:4758606c9316 190 if (zc->bytes_received < 14) {
TASS Belgium NV 131:4758606c9316 191 /* dbg("Waiting for the rest of the sig - got %u bytes\n",zc->bytes_received); */
TASS Belgium NV 131:4758606c9316 192 return;
TASS Belgium NV 131:4758606c9316 193 }
TASS Belgium NV 131:4758606c9316 194
TASS Belgium NV 131:4758606c9316 195 /* dbg("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); */
TASS Belgium NV 131:4758606c9316 196 zc->state = ST_RDY;
tass 68:0847e35d08a6 197 }
TASS Belgium NV 131:4758606c9316 198
tass 68:0847e35d08a6 199 static void zmq_hs_version(struct zmq_connector *zc)
tass 68:0847e35d08a6 200 {
TASS Belgium NV 131:4758606c9316 201 uint8_t incoming[20];
TASS Belgium NV 131:4758606c9316 202 int ret;
TASS Belgium NV 131:4758606c9316 203 ret = pico_socket_read(zc->sock, incoming, 2);
TASS Belgium NV 131:4758606c9316 204 if (ret < 0) {
TASS Belgium NV 131:4758606c9316 205 dbg("Cannot exchange valid version information. Read returned -1\n");
TASS Belgium NV 131:4758606c9316 206 zmq_connector_del(zc);
TASS Belgium NV 131:4758606c9316 207 return;
TASS Belgium NV 131:4758606c9316 208 }
TASS Belgium NV 131:4758606c9316 209
TASS Belgium NV 131:4758606c9316 210 if (ret == 0)
TASS Belgium NV 131:4758606c9316 211 return;
TASS Belgium NV 131:4758606c9316 212
TASS Belgium NV 131:4758606c9316 213 /* Version check?
TASS Belgium NV 131:4758606c9316 214 if (incoming[0] != 3) {
tass 68:0847e35d08a6 215 dbg("Version %d.x not supported by this publisher\n", incoming[0]);
tass 68:0847e35d08a6 216 zmq_connector_del(zc);
tass 68:0847e35d08a6 217 return;
TASS Belgium NV 131:4758606c9316 218 }
TASS Belgium NV 131:4758606c9316 219 dbg("Subscriber is using version 3. Good!\n");
TASS Belgium NV 131:4758606c9316 220 */
TASS Belgium NV 131:4758606c9316 221 dbg("Subscriber is using version %d. Good!\n", incoming[0]);
TASS Belgium NV 131:4758606c9316 222 if (incoming[0] == 3)
TASS Belgium NV 131:4758606c9316 223 zc->state = ST_GREETING;
TASS Belgium NV 131:4758606c9316 224 else
TASS Belgium NV 131:4758606c9316 225 zc->state = ST_RDY;
tass 68:0847e35d08a6 226 }
TASS Belgium NV 131:4758606c9316 227
tass 68:0847e35d08a6 228 static void zmq_hs_greeting(struct zmq_connector *zc)
tass 68:0847e35d08a6 229 {
TASS Belgium NV 131:4758606c9316 230 uint8_t incoming[64];
TASS Belgium NV 131:4758606c9316 231 int ret;
TASS Belgium NV 131:4758606c9316 232 ret = pico_socket_read(zc->sock, incoming, 64);
TASS Belgium NV 131:4758606c9316 233 dbg("zmq_socket_read in greeting returned %d\n", ret);
TASS Belgium NV 131:4758606c9316 234 if (ret == 0)
TASS Belgium NV 131:4758606c9316 235 return;
TASS Belgium NV 131:4758606c9316 236
TASS Belgium NV 131:4758606c9316 237 if (ret < 0) {
TASS Belgium NV 131:4758606c9316 238 dbg("Cannot retrieve valid greeting\n");
TASS Belgium NV 131:4758606c9316 239 zmq_connector_del(zc);
TASS Belgium NV 131:4758606c9316 240 return;
TASS Belgium NV 131:4758606c9316 241 }
TASS Belgium NV 131:4758606c9316 242
TASS Belgium NV 131:4758606c9316 243 zc->state = ST_RDY;
TASS Belgium NV 131:4758606c9316 244 zmq_check_state(zc->parent);
TASS Belgium NV 131:4758606c9316 245 dbg("Paired. Sending Ready.\n");
TASS Belgium NV 131:4758606c9316 246 pico_socket_write(zc->sock, "READY ", 8);
tass 68:0847e35d08a6 247 }
tass 68:0847e35d08a6 248
tass 68:0847e35d08a6 249 static void zmq_hs_rdy(struct zmq_connector *zc)
tass 68:0847e35d08a6 250 {
tass 68:0847e35d08a6 251 int ret;
tass 68:0847e35d08a6 252 uint8_t incoming[258];
tass 68:0847e35d08a6 253 if (zc->role == ROLE_SUBSCRIBER)
TASS Belgium NV 131:4758606c9316 254 return;
TASS Belgium NV 131:4758606c9316 255
tass 68:0847e35d08a6 256 ret = pico_socket_read(zc->sock, incoming, 258);
tass 68:0847e35d08a6 257 dbg("Got %d bytes from subscriber whilst in rdy state.\n", ret);
tass 68:0847e35d08a6 258 }
tass 68:0847e35d08a6 259
tass 68:0847e35d08a6 260 static void zmq_hs_busy(struct zmq_connector *zc)
tass 68:0847e35d08a6 261 {
TASS Belgium NV 131:4758606c9316 262 int was_busy = 0;
TASS Belgium NV 131:4758606c9316 263 if (zc->parent->state == ST_BUSY)
TASS Belgium NV 131:4758606c9316 264 was_busy = 1;
TASS Belgium NV 131:4758606c9316 265
TASS Belgium NV 131:4758606c9316 266 zmq_check_state(zc->parent);
TASS Belgium NV 131:4758606c9316 267 if (was_busy && (zc->parent->state == ST_RDY) && zc->parent->ready)
TASS Belgium NV 131:4758606c9316 268 zc->parent->ready(zc->parent);
tass 68:0847e35d08a6 269 }
TASS Belgium NV 131:4758606c9316 270
TASS Belgium NV 131:4758606c9316 271 static void (*zmq_hs_cb[])(struct zmq_connector *) = {
tass 68:0847e35d08a6 272 NULL,
tass 68:0847e35d08a6 273 zmq_hs_connected,
tass 68:0847e35d08a6 274 zmq_hs_signature,
tass 68:0847e35d08a6 275 zmq_hs_version,
tass 68:0847e35d08a6 276 zmq_hs_greeting,
tass 68:0847e35d08a6 277 zmq_hs_rdy,
tass 68:0847e35d08a6 278 zmq_hs_busy
tass 68:0847e35d08a6 279 };
tass 68:0847e35d08a6 280
tass 68:0847e35d08a6 281
tass 68:0847e35d08a6 282 static void cb_tcp0mq(uint16_t ev, struct pico_socket *s)
tass 68:0847e35d08a6 283 {
TASS Belgium NV 131:4758606c9316 284 struct pico_ip4 orig;
TASS Belgium NV 131:4758606c9316 285 uint16_t port;
TASS Belgium NV 131:4758606c9316 286 char peer[30];
TASS Belgium NV 131:4758606c9316 287 struct zmq_connector *z_a, *zc;
TASS Belgium NV 131:4758606c9316 288 ZMQ z = ZMTP(s);
TASS Belgium NV 131:4758606c9316 289
TASS Belgium NV 131:4758606c9316 290 /* Publisher. Accepting new subscribers */
TASS Belgium NV 131:4758606c9316 291 if (z) {
TASS Belgium NV 131:4758606c9316 292 if (ev & PICO_SOCK_EV_CONN) {
TASS Belgium NV 131:4758606c9316 293 z_a = pico_zalloc(sizeof(struct zmq_socket));
TASS Belgium NV 131:4758606c9316 294 if (z_a == NULL)
TASS Belgium NV 131:4758606c9316 295 return;
TASS Belgium NV 131:4758606c9316 296
TASS Belgium NV 131:4758606c9316 297 z_a->sock = pico_socket_accept(s, &orig, &port);
TASS Belgium NV 131:4758606c9316 298 pico_ipv4_to_string(peer, orig.addr);
TASS Belgium NV 131:4758606c9316 299 dbg("tcp0mq> Connection requested by %s:%u.\n", peer, short_be(port));
TASS Belgium NV 131:4758606c9316 300 if (z->state == ST_OPEN) {
TASS Belgium NV 131:4758606c9316 301 dbg("tcp0mq> Accepted connection! New subscriber on sock %p.\n", z_a->sock);
TASS Belgium NV 131:4758606c9316 302 zmq_connector_add(z, z_a);
TASS Belgium NV 131:4758606c9316 303 z_a->role = ROLE_PUBLISHER;
TASS Belgium NV 131:4758606c9316 304 z_a->state = ST_CONNECTED;
TASS Belgium NV 131:4758606c9316 305 zmq_hs_connected(z_a);
TASS Belgium NV 131:4758606c9316 306 } else {
TASS Belgium NV 131:4758606c9316 307 dbg("tcp0mq> Server busy, connection rejected\n");
TASS Belgium NV 131:4758606c9316 308 pico_socket_close(z_a->sock);
TASS Belgium NV 131:4758606c9316 309 }
TASS Belgium NV 131:4758606c9316 310 }
TASS Belgium NV 131:4758606c9316 311
tass 68:0847e35d08a6 312 return;
tass 68:0847e35d08a6 313 }
tass 68:0847e35d08a6 314
TASS Belgium NV 131:4758606c9316 315 zc = find_subscriber(s);
TASS Belgium NV 131:4758606c9316 316 if (!zc) {
TASS Belgium NV 131:4758606c9316 317 dbg("Cannot find subscriber with socket %p, ev = %d!\n", s, ev);
TASS Belgium NV 131:4758606c9316 318 /* pico_socket_close(s); */
TASS Belgium NV 131:4758606c9316 319 return;
TASS Belgium NV 131:4758606c9316 320 }
tass 68:0847e35d08a6 321
TASS Belgium NV 131:4758606c9316 322 if ((ev & PICO_SOCK_EV_CONN) && zc->role == ROLE_SUBSCRIBER && zc->state == ST_OPEN)
TASS Belgium NV 131:4758606c9316 323 {
TASS Belgium NV 131:4758606c9316 324 zc->state = ST_CONNECTED;
TASS Belgium NV 131:4758606c9316 325 zmq_hs_connected(zc);
TASS Belgium NV 131:4758606c9316 326 }
tass 68:0847e35d08a6 327
tass 68:0847e35d08a6 328
TASS Belgium NV 131:4758606c9316 329 if (ev & PICO_SOCK_EV_RD) {
TASS Belgium NV 131:4758606c9316 330 if (zmq_hs_cb[zc->state])
TASS Belgium NV 131:4758606c9316 331 zmq_hs_cb[zc->state](zc);
TASS Belgium NV 131:4758606c9316 332 }
TASS Belgium NV 131:4758606c9316 333
TASS Belgium NV 131:4758606c9316 334 if ((ev & PICO_SOCK_EV_WR) && zc->parent && (zc->parent->role == ROLE_PUBLISHER) && (zc->state == ST_BUSY)) {
TASS Belgium NV 131:4758606c9316 335 if (zmq_hs_cb[zc->state])
TASS Belgium NV 131:4758606c9316 336 zmq_hs_cb[zc->state](zc);
TASS Belgium NV 131:4758606c9316 337 }
TASS Belgium NV 131:4758606c9316 338
tass 68:0847e35d08a6 339
TASS Belgium NV 131:4758606c9316 340 if (ev & PICO_SOCK_EV_FIN) {
TASS Belgium NV 131:4758606c9316 341 dbg("tcp0mq> Connection closed.\n");
TASS Belgium NV 131:4758606c9316 342 zmq_connector_del(zc);
TASS Belgium NV 131:4758606c9316 343 }
TASS Belgium NV 131:4758606c9316 344
TASS Belgium NV 131:4758606c9316 345 if (ev & PICO_SOCK_EV_ERR) {
TASS Belgium NV 131:4758606c9316 346 dbg("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err));
TASS Belgium NV 131:4758606c9316 347 zmq_connector_del(zc);
TASS Belgium NV 131:4758606c9316 348 }
TASS Belgium NV 131:4758606c9316 349
TASS Belgium NV 131:4758606c9316 350 if (ev & PICO_SOCK_EV_CLOSE) {
TASS Belgium NV 131:4758606c9316 351 dbg("tcp0mq> event close\n");
TASS Belgium NV 131:4758606c9316 352 zmq_connector_del(zc);
TASS Belgium NV 131:4758606c9316 353 }
TASS Belgium NV 131:4758606c9316 354
tass 68:0847e35d08a6 355 }
tass 68:0847e35d08a6 356
tass 68:0847e35d08a6 357 ZMQ zmq_subscriber(void (*cb)(ZMQ z))
tass 68:0847e35d08a6 358 {
TASS Belgium NV 131:4758606c9316 359 ZMQ z = pico_zalloc(sizeof(struct zmq_socket));
TASS Belgium NV 131:4758606c9316 360 if (!z) {
TASS Belgium NV 131:4758606c9316 361 pico_err = PICO_ERR_ENOMEM;
TASS Belgium NV 131:4758606c9316 362 return NULL;
TASS Belgium NV 131:4758606c9316 363 }
TASS Belgium NV 131:4758606c9316 364
TASS Belgium NV 131:4758606c9316 365 z->state = ST_BUSY;
TASS Belgium NV 131:4758606c9316 366 z->ready = cb;
TASS Belgium NV 131:4758606c9316 367 z->role = ROLE_SUBSCRIBER;
TASS Belgium NV 131:4758606c9316 368 pico_tree_insert(&zmq_sockets, z);
TASS Belgium NV 131:4758606c9316 369 return z;
tass 68:0847e35d08a6 370 }
tass 68:0847e35d08a6 371
TASS Belgium NV 131:4758606c9316 372 int zmq_connect(ZMQ z, char *address, uint16_t port)
tass 68:0847e35d08a6 373 {
TASS Belgium NV 131:4758606c9316 374 struct pico_ip4 ip = {
TASS Belgium NV 131:4758606c9316 375 0
TASS Belgium NV 131:4758606c9316 376 };
TASS Belgium NV 131:4758606c9316 377 struct zmq_connector *z_c;
TASS Belgium NV 131:4758606c9316 378 uint8_t sockopts = 1;
TASS Belgium NV 131:4758606c9316 379 if (pico_string_to_ipv4(address, &ip.addr) < 0) {
TASS Belgium NV 131:4758606c9316 380 dbg("FIXME!! I need to synchronize with the dns client to get to my publisher :(\n");
TASS Belgium NV 131:4758606c9316 381 return -1;
TASS Belgium NV 131:4758606c9316 382 }
tass 68:0847e35d08a6 383
TASS Belgium NV 131:4758606c9316 384 z_c = pico_zalloc(sizeof(struct zmq_connector));
TASS Belgium NV 131:4758606c9316 385 if (!z_c)
TASS Belgium NV 131:4758606c9316 386 return -1;
TASS Belgium NV 131:4758606c9316 387
TASS Belgium NV 131:4758606c9316 388 z_c->role = ROLE_SUBSCRIBER;
TASS Belgium NV 131:4758606c9316 389 z_c->state = ST_OPEN;
TASS Belgium NV 131:4758606c9316 390 z_c->sock = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
TASS Belgium NV 131:4758606c9316 391 if (!z_c->sock) {
TASS Belgium NV 131:4758606c9316 392 pico_free(z_c);
TASS Belgium NV 131:4758606c9316 393 return -1;
TASS Belgium NV 131:4758606c9316 394 }
TASS Belgium NV 131:4758606c9316 395
TASS Belgium NV 131:4758606c9316 396 pico_socket_setoption(z_c->sock, PICO_TCP_NODELAY, &sockopts);
TASS Belgium NV 131:4758606c9316 397 if (pico_socket_connect(z_c->sock, &ip, short_be(port)) < 0)
TASS Belgium NV 131:4758606c9316 398 return -1;
TASS Belgium NV 131:4758606c9316 399
TASS Belgium NV 131:4758606c9316 400 zmq_connector_add(z, z_c);
TASS Belgium NV 131:4758606c9316 401 return 0;
tass 68:0847e35d08a6 402 }
tass 68:0847e35d08a6 403
tass 68:0847e35d08a6 404 ZMQ zmq_publisher(uint16_t _port, void (*cb)(ZMQ z))
tass 68:0847e35d08a6 405 {
TASS Belgium NV 131:4758606c9316 406 struct pico_socket *s;
TASS Belgium NV 131:4758606c9316 407 struct pico_ip4 inaddr_any = {
TASS Belgium NV 131:4758606c9316 408 0
TASS Belgium NV 131:4758606c9316 409 };
TASS Belgium NV 131:4758606c9316 410 uint8_t sockopts = 1;
TASS Belgium NV 131:4758606c9316 411 uint16_t port = short_be(_port);
TASS Belgium NV 131:4758606c9316 412 ZMQ z = NULL;
TASS Belgium NV 131:4758606c9316 413 s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq);
TASS Belgium NV 131:4758606c9316 414 if (!s)
TASS Belgium NV 131:4758606c9316 415 return NULL;
TASS Belgium NV 131:4758606c9316 416
TASS Belgium NV 131:4758606c9316 417 pico_socket_setoption(s, PICO_TCP_NODELAY, &sockopts);
TASS Belgium NV 131:4758606c9316 418
TASS Belgium NV 131:4758606c9316 419 dbg("zmq_publisher: BIND\n");
TASS Belgium NV 131:4758606c9316 420 if (pico_socket_bind(s, &inaddr_any, &port) != 0) {
TASS Belgium NV 131:4758606c9316 421 dbg("zmq publisher: BIND failed\n");
TASS Belgium NV 131:4758606c9316 422 return NULL;
TASS Belgium NV 131:4758606c9316 423 }
tass 122:5b1e9de8bf7f 424
TASS Belgium NV 131:4758606c9316 425 if (pico_socket_listen(s, 2) != 0) {
TASS Belgium NV 131:4758606c9316 426 dbg("zmq publisher: LISTEN failed\n");
TASS Belgium NV 131:4758606c9316 427 return NULL;
TASS Belgium NV 131:4758606c9316 428 }
TASS Belgium NV 131:4758606c9316 429
TASS Belgium NV 131:4758606c9316 430 dbg("zmq_publisher: Active and bound to local port %d\n", short_be(port));
tass 68:0847e35d08a6 431
TASS Belgium NV 131:4758606c9316 432 z = pico_zalloc(sizeof(struct zmq_socket));
TASS Belgium NV 131:4758606c9316 433 if (!z) {
TASS Belgium NV 131:4758606c9316 434 pico_socket_close(s);
TASS Belgium NV 131:4758606c9316 435 pico_err = PICO_ERR_ENOMEM;
TASS Belgium NV 131:4758606c9316 436 return NULL;
TASS Belgium NV 131:4758606c9316 437 }
TASS Belgium NV 131:4758606c9316 438
TASS Belgium NV 131:4758606c9316 439 z->sock = s;
TASS Belgium NV 131:4758606c9316 440 z->state = ST_OPEN;
TASS Belgium NV 131:4758606c9316 441 z->ready = cb;
TASS Belgium NV 131:4758606c9316 442 z->role = ROLE_PUBLISHER;
TASS Belgium NV 131:4758606c9316 443 z->subs = NULL;
TASS Belgium NV 131:4758606c9316 444 pico_tree_insert(&zmq_sockets, z);
TASS Belgium NV 131:4758606c9316 445 dbg("zmq publisher created.\n");
TASS Belgium NV 131:4758606c9316 446 return z;
tass 68:0847e35d08a6 447 }
tass 68:0847e35d08a6 448
tass 68:0847e35d08a6 449 int zmq_send(ZMQ z, char *txt, int len)
tass 68:0847e35d08a6 450 {
tass 68:0847e35d08a6 451 struct zmq_msg *msg;
tass 68:0847e35d08a6 452 struct zmq_connector *c = z->subs;
tass 68:0847e35d08a6 453 int ret = 0;
tass 68:0847e35d08a6 454
TASS Belgium NV 131:4758606c9316 455 if (!c)
tass 68:0847e35d08a6 456 {
tass 68:0847e35d08a6 457 dbg("no subscribers, bailing out\n");
tass 68:0847e35d08a6 458 return 0; /* Need at least one subscriber */
tass 68:0847e35d08a6 459 }
TASS Belgium NV 131:4758606c9316 460
tass 70:cd218dd180e5 461 msg = pico_zalloc((size_t)(len + 2));
tass 68:0847e35d08a6 462 msg->flags = 4;
tass 68:0847e35d08a6 463 msg->len = (uint8_t) len;
TASS Belgium NV 131:4758606c9316 464 memcpy(msg->txt, txt, (size_t) len);
tass 68:0847e35d08a6 465
tass 68:0847e35d08a6 466 while (c) {
TASS Belgium NV 131:4758606c9316 467 dbg("write to %u\n", c->state);
TASS Belgium NV 131:4758606c9316 468 if ((ST_RDY == c->state) && (pico_socket_write(c->sock, msg, len + 2) > 0))
TASS Belgium NV 131:4758606c9316 469 ret++;
TASS Belgium NV 131:4758606c9316 470
TASS Belgium NV 131:4758606c9316 471 c = c->next;
tass 68:0847e35d08a6 472 }
tass 68:0847e35d08a6 473 pico_free(msg);
tass 68:0847e35d08a6 474 return ret;
tass 68:0847e35d08a6 475 }
tass 68:0847e35d08a6 476
tass 68:0847e35d08a6 477 int zmq_recv(ZMQ z, char *txt)
tass 68:0847e35d08a6 478 {
TASS Belgium NV 131:4758606c9316 479 int ret;
TASS Belgium NV 131:4758606c9316 480 struct zmq_msg msg;
TASS Belgium NV 131:4758606c9316 481 struct zmq_connector *nxt, *c = z->subs;
TASS Belgium NV 131:4758606c9316 482 if (z->state != ST_RDY)
TASS Belgium NV 131:4758606c9316 483 return 0;
TASS Belgium NV 131:4758606c9316 484
TASS Belgium NV 131:4758606c9316 485 while (c) {
TASS Belgium NV 131:4758606c9316 486 nxt = c->next;
TASS Belgium NV 131:4758606c9316 487 ret = pico_socket_read(c->sock, &msg, 2);
TASS Belgium NV 131:4758606c9316 488 if (ret < 0) {
TASS Belgium NV 131:4758606c9316 489 dbg("Error reading!\n");
TASS Belgium NV 131:4758606c9316 490 zmq_connector_del(c);
TASS Belgium NV 131:4758606c9316 491 } else if (ret < 2) {
TASS Belgium NV 131:4758606c9316 492 c->state = ST_BUSY;
TASS Belgium NV 131:4758606c9316 493 } else {
TASS Belgium NV 131:4758606c9316 494 return pico_socket_read(c->sock, txt, msg.len);
TASS Belgium NV 131:4758606c9316 495 }
TASS Belgium NV 131:4758606c9316 496
TASS Belgium NV 131:4758606c9316 497 c = nxt;
TASS Belgium NV 131:4758606c9316 498 }
TASS Belgium NV 131:4758606c9316 499 zmq_check_state(z);
tass 68:0847e35d08a6 500 return 0;
tass 68:0847e35d08a6 501 }
tass 68:0847e35d08a6 502
tass 68:0847e35d08a6 503 void zmq_close(ZMQ z)
tass 68:0847e35d08a6 504 {
TASS Belgium NV 131:4758606c9316 505 struct zmq_connector *nxt, *c = z->subs;
TASS Belgium NV 131:4758606c9316 506 while(c) {
TASS Belgium NV 131:4758606c9316 507 nxt = c->next;
TASS Belgium NV 131:4758606c9316 508 zmq_connector_del(c);
TASS Belgium NV 131:4758606c9316 509 c = nxt;
TASS Belgium NV 131:4758606c9316 510 }
TASS Belgium NV 131:4758606c9316 511 pico_socket_close(z->sock);
TASS Belgium NV 131:4758606c9316 512 pico_free(z);
tass 68:0847e35d08a6 513 }