Free (GPLv2) TCP/IP stack developed by TASS Belgium
Dependents: lpc1768-picotcp-demo ZeroMQ_PicoTCP_Publisher_demo TCPSocket_HelloWorld_PicoTCP Pico_TCP_UDP_Test ... more
PicoTCP. Copyright (c) 2013 TASS Belgium NV.
Released under the GNU General Public License, version 2.
Different licensing models may exist, at the sole discretion of the Copyright holders.
Official homepage: http://www.picotcp.com
Bug tracker: https://github.com/tass-belgium/picotcp/issues
Development steps:
initial integration with mbed RTOSgeneric mbed Ethernet driverhigh performance NXP LPC1768 specific Ethernet driverMulti-threading support for mbed RTOSBerkeley sockets and integration with the New Socket APIFork of the apps running on top of the New Socket APIScheduling optimizations- Debugging/benchmarking/testing
Demo application (measuring TCP sender performance):
Import programlpc1768-picotcp-demo
A PicoTCP demo app testing the ethernet throughput on the lpc1768 mbed board.
modules/pico_zmq.c@131:4758606c9316, 2013-12-16 (annotated)
- Committer:
- TASS Belgium NV
- Date:
- Mon Dec 16 11:25:54 2013 +0100
- Revision:
- 131:4758606c9316
- Parent:
- 122:5b1e9de8bf7f
- Child:
- 135:a064a384eae6
Syncronized with master branch
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
tass | 68:0847e35d08a6 | 1 | /********************************************************************* |
TASS Belgium NV |
131:4758606c9316 | 2 | PicoTCP. Copyright (c) 2012 TASS Belgium NV. Some rights reserved. |
TASS Belgium NV |
131:4758606c9316 | 3 | See LICENSE and COPYING for usage. |
tass | 68:0847e35d08a6 | 4 | |
TASS Belgium NV |
131:4758606c9316 | 5 | Authors: Daniele Lacamera |
TASS Belgium NV |
131:4758606c9316 | 6 | *********************************************************************/ |
tass | 68:0847e35d08a6 | 7 | |
tass | 68:0847e35d08a6 | 8 | #include "pico_stack.h" |
tass | 68:0847e35d08a6 | 9 | #include "pico_config.h" |
tass | 68:0847e35d08a6 | 10 | #include "pico_ipv4.h" |
tass | 68:0847e35d08a6 | 11 | #include "pico_socket.h" |
tass | 68:0847e35d08a6 | 12 | #include "pico_zmq.h" |
tass | 68:0847e35d08a6 | 13 | |
tass | 68:0847e35d08a6 | 14 | #define MY_VERSION 1u |
tass | 68:0847e35d08a6 | 15 | |
TASS Belgium NV |
131:4758606c9316 | 16 | |
tass | 68:0847e35d08a6 | 17 | enum zmq_state { |
TASS Belgium NV |
131:4758606c9316 | 18 | ST_OPEN = 0, |
TASS Belgium NV |
131:4758606c9316 | 19 | ST_CONNECTED, |
TASS Belgium NV |
131:4758606c9316 | 20 | ST_SIGNATURE, |
TASS Belgium NV |
131:4758606c9316 | 21 | ST_VERSION, |
TASS Belgium NV |
131:4758606c9316 | 22 | ST_GREETING, |
TASS Belgium NV |
131:4758606c9316 | 23 | ST_RDY, |
TASS Belgium NV |
131:4758606c9316 | 24 | ST_BUSY |
tass | 68:0847e35d08a6 | 25 | }; |
tass | 68:0847e35d08a6 | 26 | |
tass | 68:0847e35d08a6 | 27 | enum zmq_role { |
TASS Belgium NV |
131:4758606c9316 | 28 | ROLE_NONE = 0, |
TASS Belgium NV |
131:4758606c9316 | 29 | ROLE_PUBLISHER, |
TASS Belgium NV |
131:4758606c9316 | 30 | ROLE_SUBSCRIBER |
tass | 68:0847e35d08a6 | 31 | }; |
tass | 68:0847e35d08a6 | 32 | |
tass | 68:0847e35d08a6 | 33 | struct __attribute__((packed)) zmq_msg { |
TASS Belgium NV |
131:4758606c9316 | 34 | uint8_t flags; |
tass | 68:0847e35d08a6 | 35 | uint8_t len; |
TASS Belgium NV |
131:4758606c9316 | 36 | char txt[]; |
tass | 68:0847e35d08a6 | 37 | }; |
tass | 68:0847e35d08a6 | 38 | |
tass | 68:0847e35d08a6 | 39 | struct zmq_socket; |
tass | 68:0847e35d08a6 | 40 | |
tass | 68:0847e35d08a6 | 41 | struct zmq_connector { |
TASS Belgium NV |
131:4758606c9316 | 42 | struct pico_socket *sock; |
TASS Belgium NV |
131:4758606c9316 | 43 | enum zmq_state state; |
TASS Belgium NV |
131:4758606c9316 | 44 | ZMQ parent; |
TASS Belgium NV |
131:4758606c9316 | 45 | enum zmq_role role; |
TASS Belgium NV |
131:4758606c9316 | 46 | uint8_t bytes_received; |
TASS Belgium NV |
131:4758606c9316 | 47 | struct zmq_connector *next; |
tass | 68:0847e35d08a6 | 48 | }; |
tass | 68:0847e35d08a6 | 49 | |
tass | 68:0847e35d08a6 | 50 | struct zmq_socket { |
TASS Belgium NV |
131:4758606c9316 | 51 | struct pico_socket *sock; |
TASS Belgium NV |
131:4758606c9316 | 52 | void (*ready)(ZMQ z); |
TASS Belgium NV |
131:4758606c9316 | 53 | enum zmq_state state; |
TASS Belgium NV |
131:4758606c9316 | 54 | struct zmq_connector *subs; |
TASS Belgium NV |
131:4758606c9316 | 55 | enum zmq_role role; |
tass | 68:0847e35d08a6 | 56 | }; |
tass | 68:0847e35d08a6 | 57 | |
tass | 68:0847e35d08a6 | 58 | static int zmq_socket_cmp(void *ka, void *kb) |
tass | 68:0847e35d08a6 | 59 | { |
TASS Belgium NV |
131:4758606c9316 | 60 | ZMQ a = ka; |
TASS Belgium NV |
131:4758606c9316 | 61 | ZMQ b = kb; |
TASS Belgium NV |
131:4758606c9316 | 62 | if (a->sock < b->sock) |
TASS Belgium NV |
131:4758606c9316 | 63 | return -1; |
TASS Belgium NV |
131:4758606c9316 | 64 | |
TASS Belgium NV |
131:4758606c9316 | 65 | if (b->sock < a->sock) |
TASS Belgium NV |
131:4758606c9316 | 66 | return 1; |
TASS Belgium NV |
131:4758606c9316 | 67 | |
TASS Belgium NV |
131:4758606c9316 | 68 | return 0; |
tass | 68:0847e35d08a6 | 69 | } |
tass | 68:0847e35d08a6 | 70 | PICO_TREE_DECLARE(zmq_sockets, zmq_socket_cmp); |
tass | 68:0847e35d08a6 | 71 | |
tass | 68:0847e35d08a6 | 72 | static inline ZMQ ZMTP(struct pico_socket *s) |
tass | 68:0847e35d08a6 | 73 | { |
TASS Belgium NV |
131:4758606c9316 | 74 | struct zmq_socket tst = { |
TASS Belgium NV |
131:4758606c9316 | 75 | .sock = s |
TASS Belgium NV |
131:4758606c9316 | 76 | }; |
TASS Belgium NV |
131:4758606c9316 | 77 | return (pico_tree_findKey(&zmq_sockets, &tst)); |
tass | 68:0847e35d08a6 | 78 | } |
tass | 68:0847e35d08a6 | 79 | |
tass | 68:0847e35d08a6 | 80 | static inline struct zmq_connector *find_subscriber(struct pico_socket *s) |
tass | 68:0847e35d08a6 | 81 | { |
TASS Belgium NV |
131:4758606c9316 | 82 | ZMQ search; |
TASS Belgium NV |
131:4758606c9316 | 83 | struct pico_tree_node *idx; |
TASS Belgium NV |
131:4758606c9316 | 84 | struct zmq_connector *el; |
TASS Belgium NV |
131:4758606c9316 | 85 | pico_tree_foreach(idx, &zmq_sockets) { |
TASS Belgium NV |
131:4758606c9316 | 86 | search = idx->keyValue; |
TASS Belgium NV |
131:4758606c9316 | 87 | el = search->subs; |
TASS Belgium NV |
131:4758606c9316 | 88 | while(el) { |
TASS Belgium NV |
131:4758606c9316 | 89 | if (el->sock == s) |
TASS Belgium NV |
131:4758606c9316 | 90 | return el; |
TASS Belgium NV |
131:4758606c9316 | 91 | |
TASS Belgium NV |
131:4758606c9316 | 92 | el = el->next; |
TASS Belgium NV |
131:4758606c9316 | 93 | } |
tass | 68:0847e35d08a6 | 94 | } |
TASS Belgium NV |
131:4758606c9316 | 95 | return NULL; |
tass | 68:0847e35d08a6 | 96 | } |
tass | 68:0847e35d08a6 | 97 | |
tass | 68:0847e35d08a6 | 98 | |
tass | 68:0847e35d08a6 | 99 | static void zmq_connector_add(ZMQ z, struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 100 | { |
TASS Belgium NV |
131:4758606c9316 | 101 | zc->next = z->subs; |
TASS Belgium NV |
131:4758606c9316 | 102 | z->subs = zc; |
TASS Belgium NV |
131:4758606c9316 | 103 | zc->parent = z; |
TASS Belgium NV |
131:4758606c9316 | 104 | dbg("Added connector %p, sock is %p\n", zc, zc->sock); |
tass | 68:0847e35d08a6 | 105 | } |
tass | 68:0847e35d08a6 | 106 | |
tass | 68:0847e35d08a6 | 107 | static void zmq_connector_del(struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 108 | { |
TASS Belgium NV |
131:4758606c9316 | 109 | ZMQ z = zc->parent; |
TASS Belgium NV |
131:4758606c9316 | 110 | if(z) { |
TASS Belgium NV |
131:4758606c9316 | 111 | struct zmq_connector *el = z->subs, *prev = NULL; /* el = pointer to linked list */ |
TASS Belgium NV |
131:4758606c9316 | 112 | while(el) { |
TASS Belgium NV |
131:4758606c9316 | 113 | if (el == zc) { /* did we find the connector that we want to delete? */ |
TASS Belgium NV |
131:4758606c9316 | 114 | if (prev) /* was there a previous list item? */ |
TASS Belgium NV |
131:4758606c9316 | 115 | prev->next = zc->next; /* link the linked list again */ |
TASS Belgium NV |
131:4758606c9316 | 116 | else |
TASS Belgium NV |
131:4758606c9316 | 117 | z->subs = zc->next; /* we were at the beginning of the list */ |
TASS Belgium NV |
131:4758606c9316 | 118 | |
TASS Belgium NV |
131:4758606c9316 | 119 | break; |
TASS Belgium NV |
131:4758606c9316 | 120 | } |
TASS Belgium NV |
131:4758606c9316 | 121 | |
TASS Belgium NV |
131:4758606c9316 | 122 | prev = el; |
TASS Belgium NV |
131:4758606c9316 | 123 | el = el->next; |
TASS Belgium NV |
131:4758606c9316 | 124 | } |
tass | 68:0847e35d08a6 | 125 | } |
TASS Belgium NV |
131:4758606c9316 | 126 | |
TASS Belgium NV |
131:4758606c9316 | 127 | pico_socket_close(zc->sock); |
TASS Belgium NV |
131:4758606c9316 | 128 | pico_free(zc); |
tass | 68:0847e35d08a6 | 129 | } |
tass | 68:0847e35d08a6 | 130 | |
TASS Belgium NV |
131:4758606c9316 | 131 | static void zmq_check_state(ZMQ z) |
tass | 68:0847e35d08a6 | 132 | { |
TASS Belgium NV |
131:4758606c9316 | 133 | struct zmq_connector *c = z->subs; |
TASS Belgium NV |
131:4758606c9316 | 134 | enum zmq_state default_state, option_state; |
TASS Belgium NV |
131:4758606c9316 | 135 | if ((z->state != ST_RDY) && (z->state != ST_BUSY)) |
TASS Belgium NV |
131:4758606c9316 | 136 | return; |
TASS Belgium NV |
131:4758606c9316 | 137 | |
TASS Belgium NV |
131:4758606c9316 | 138 | if (z->role == ROLE_SUBSCRIBER) { |
TASS Belgium NV |
131:4758606c9316 | 139 | default_state = ST_RDY; |
TASS Belgium NV |
131:4758606c9316 | 140 | option_state = ST_BUSY; |
TASS Belgium NV |
131:4758606c9316 | 141 | } else { |
TASS Belgium NV |
131:4758606c9316 | 142 | default_state = ST_BUSY; |
TASS Belgium NV |
131:4758606c9316 | 143 | option_state = ST_RDY; |
tass | 68:0847e35d08a6 | 144 | } |
TASS Belgium NV |
131:4758606c9316 | 145 | |
TASS Belgium NV |
131:4758606c9316 | 146 | z->state = default_state; |
TASS Belgium NV |
131:4758606c9316 | 147 | while(c) { |
TASS Belgium NV |
131:4758606c9316 | 148 | if (c->state == option_state) { |
TASS Belgium NV |
131:4758606c9316 | 149 | z->state = option_state; |
TASS Belgium NV |
131:4758606c9316 | 150 | return; |
TASS Belgium NV |
131:4758606c9316 | 151 | } |
TASS Belgium NV |
131:4758606c9316 | 152 | |
TASS Belgium NV |
131:4758606c9316 | 153 | c = c->next; |
TASS Belgium NV |
131:4758606c9316 | 154 | } |
tass | 68:0847e35d08a6 | 155 | } |
tass | 68:0847e35d08a6 | 156 | |
tass | 68:0847e35d08a6 | 157 | |
tass | 68:0847e35d08a6 | 158 | static void zmq_hs_connected(struct zmq_connector *z) |
tass | 68:0847e35d08a6 | 159 | { |
TASS Belgium NV |
131:4758606c9316 | 160 | /* v2 signature */ |
TASS Belgium NV |
131:4758606c9316 | 161 | uint8_t my_signature[14] = { |
TASS Belgium NV |
131:4758606c9316 | 162 | 0xff, 0, 0, 0, 0, 0, 0, 0, 1, 0x7f, 1, 1, 0, 0 |
TASS Belgium NV |
131:4758606c9316 | 163 | }; |
tass | 68:0847e35d08a6 | 164 | |
TASS Belgium NV |
131:4758606c9316 | 165 | /* uint8_t my_ver[2] = {MY_VERSION, 0}; */ |
TASS Belgium NV |
131:4758606c9316 | 166 | /* uint8_t my_greeting[52] = {'N','U','L','L', 0}; */ |
tass | 68:0847e35d08a6 | 167 | |
TASS Belgium NV |
131:4758606c9316 | 168 | pico_socket_write(z->sock, my_signature, 14); |
TASS Belgium NV |
131:4758606c9316 | 169 | /* pico_socket_write(z->sock, my_ver, 2); */ |
tass | 68:0847e35d08a6 | 170 | |
TASS Belgium NV |
131:4758606c9316 | 171 | /* if (MY_VERSION > 2) */ |
TASS Belgium NV |
131:4758606c9316 | 172 | /* pico_socket_write(z->sock, my_greeting, 52); */ |
tass | 68:0847e35d08a6 | 173 | |
TASS Belgium NV |
131:4758606c9316 | 174 | z->state = ST_SIGNATURE; |
TASS Belgium NV |
131:4758606c9316 | 175 | /* z->state = ST_RDY; */ |
tass | 68:0847e35d08a6 | 176 | } |
TASS Belgium NV |
131:4758606c9316 | 177 | |
tass | 68:0847e35d08a6 | 178 | static void zmq_hs_signature(struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 179 | { |
TASS Belgium NV |
131:4758606c9316 | 180 | uint8_t incoming[20]; |
TASS Belgium NV |
131:4758606c9316 | 181 | int ret; |
TASS Belgium NV |
131:4758606c9316 | 182 | |
TASS Belgium NV |
131:4758606c9316 | 183 | ret = pico_socket_read(zc->sock, incoming, 14); |
TASS Belgium NV |
131:4758606c9316 | 184 | if (zc->bytes_received == 0 && ret > 0 && incoming[0] != 0xFF) { |
TASS Belgium NV |
131:4758606c9316 | 185 | /* dbg("Received invalid signature: [0]!=0xFF\n"); */ |
TASS Belgium NV |
131:4758606c9316 | 186 | zmq_connector_del(zc); |
TASS Belgium NV |
131:4758606c9316 | 187 | } |
tass | 68:0847e35d08a6 | 188 | |
TASS Belgium NV |
131:4758606c9316 | 189 | zc->bytes_received = (uint8_t)(zc->bytes_received + ret); |
TASS Belgium NV |
131:4758606c9316 | 190 | if (zc->bytes_received < 14) { |
TASS Belgium NV |
131:4758606c9316 | 191 | /* dbg("Waiting for the rest of the sig - got %u bytes\n",zc->bytes_received); */ |
TASS Belgium NV |
131:4758606c9316 | 192 | return; |
TASS Belgium NV |
131:4758606c9316 | 193 | } |
TASS Belgium NV |
131:4758606c9316 | 194 | |
TASS Belgium NV |
131:4758606c9316 | 195 | /* dbg("Valid signature received. len = %d, first byte: %02x\n", ret, incoming[0]); */ |
TASS Belgium NV |
131:4758606c9316 | 196 | zc->state = ST_RDY; |
tass | 68:0847e35d08a6 | 197 | } |
TASS Belgium NV |
131:4758606c9316 | 198 | |
tass | 68:0847e35d08a6 | 199 | static void zmq_hs_version(struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 200 | { |
TASS Belgium NV |
131:4758606c9316 | 201 | uint8_t incoming[20]; |
TASS Belgium NV |
131:4758606c9316 | 202 | int ret; |
TASS Belgium NV |
131:4758606c9316 | 203 | ret = pico_socket_read(zc->sock, incoming, 2); |
TASS Belgium NV |
131:4758606c9316 | 204 | if (ret < 0) { |
TASS Belgium NV |
131:4758606c9316 | 205 | dbg("Cannot exchange valid version information. Read returned -1\n"); |
TASS Belgium NV |
131:4758606c9316 | 206 | zmq_connector_del(zc); |
TASS Belgium NV |
131:4758606c9316 | 207 | return; |
TASS Belgium NV |
131:4758606c9316 | 208 | } |
TASS Belgium NV |
131:4758606c9316 | 209 | |
TASS Belgium NV |
131:4758606c9316 | 210 | if (ret == 0) |
TASS Belgium NV |
131:4758606c9316 | 211 | return; |
TASS Belgium NV |
131:4758606c9316 | 212 | |
TASS Belgium NV |
131:4758606c9316 | 213 | /* Version check? |
TASS Belgium NV |
131:4758606c9316 | 214 | if (incoming[0] != 3) { |
tass | 68:0847e35d08a6 | 215 | dbg("Version %d.x not supported by this publisher\n", incoming[0]); |
tass | 68:0847e35d08a6 | 216 | zmq_connector_del(zc); |
tass | 68:0847e35d08a6 | 217 | return; |
TASS Belgium NV |
131:4758606c9316 | 218 | } |
TASS Belgium NV |
131:4758606c9316 | 219 | dbg("Subscriber is using version 3. Good!\n"); |
TASS Belgium NV |
131:4758606c9316 | 220 | */ |
TASS Belgium NV |
131:4758606c9316 | 221 | dbg("Subscriber is using version %d. Good!\n", incoming[0]); |
TASS Belgium NV |
131:4758606c9316 | 222 | if (incoming[0] == 3) |
TASS Belgium NV |
131:4758606c9316 | 223 | zc->state = ST_GREETING; |
TASS Belgium NV |
131:4758606c9316 | 224 | else |
TASS Belgium NV |
131:4758606c9316 | 225 | zc->state = ST_RDY; |
tass | 68:0847e35d08a6 | 226 | } |
TASS Belgium NV |
131:4758606c9316 | 227 | |
tass | 68:0847e35d08a6 | 228 | static void zmq_hs_greeting(struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 229 | { |
TASS Belgium NV |
131:4758606c9316 | 230 | uint8_t incoming[64]; |
TASS Belgium NV |
131:4758606c9316 | 231 | int ret; |
TASS Belgium NV |
131:4758606c9316 | 232 | ret = pico_socket_read(zc->sock, incoming, 64); |
TASS Belgium NV |
131:4758606c9316 | 233 | dbg("zmq_socket_read in greeting returned %d\n", ret); |
TASS Belgium NV |
131:4758606c9316 | 234 | if (ret == 0) |
TASS Belgium NV |
131:4758606c9316 | 235 | return; |
TASS Belgium NV |
131:4758606c9316 | 236 | |
TASS Belgium NV |
131:4758606c9316 | 237 | if (ret < 0) { |
TASS Belgium NV |
131:4758606c9316 | 238 | dbg("Cannot retrieve valid greeting\n"); |
TASS Belgium NV |
131:4758606c9316 | 239 | zmq_connector_del(zc); |
TASS Belgium NV |
131:4758606c9316 | 240 | return; |
TASS Belgium NV |
131:4758606c9316 | 241 | } |
TASS Belgium NV |
131:4758606c9316 | 242 | |
TASS Belgium NV |
131:4758606c9316 | 243 | zc->state = ST_RDY; |
TASS Belgium NV |
131:4758606c9316 | 244 | zmq_check_state(zc->parent); |
TASS Belgium NV |
131:4758606c9316 | 245 | dbg("Paired. Sending Ready.\n"); |
TASS Belgium NV |
131:4758606c9316 | 246 | pico_socket_write(zc->sock, "READY ", 8); |
tass | 68:0847e35d08a6 | 247 | } |
tass | 68:0847e35d08a6 | 248 | |
tass | 68:0847e35d08a6 | 249 | static void zmq_hs_rdy(struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 250 | { |
tass | 68:0847e35d08a6 | 251 | int ret; |
tass | 68:0847e35d08a6 | 252 | uint8_t incoming[258]; |
tass | 68:0847e35d08a6 | 253 | if (zc->role == ROLE_SUBSCRIBER) |
TASS Belgium NV |
131:4758606c9316 | 254 | return; |
TASS Belgium NV |
131:4758606c9316 | 255 | |
tass | 68:0847e35d08a6 | 256 | ret = pico_socket_read(zc->sock, incoming, 258); |
tass | 68:0847e35d08a6 | 257 | dbg("Got %d bytes from subscriber whilst in rdy state.\n", ret); |
tass | 68:0847e35d08a6 | 258 | } |
tass | 68:0847e35d08a6 | 259 | |
tass | 68:0847e35d08a6 | 260 | static void zmq_hs_busy(struct zmq_connector *zc) |
tass | 68:0847e35d08a6 | 261 | { |
TASS Belgium NV |
131:4758606c9316 | 262 | int was_busy = 0; |
TASS Belgium NV |
131:4758606c9316 | 263 | if (zc->parent->state == ST_BUSY) |
TASS Belgium NV |
131:4758606c9316 | 264 | was_busy = 1; |
TASS Belgium NV |
131:4758606c9316 | 265 | |
TASS Belgium NV |
131:4758606c9316 | 266 | zmq_check_state(zc->parent); |
TASS Belgium NV |
131:4758606c9316 | 267 | if (was_busy && (zc->parent->state == ST_RDY) && zc->parent->ready) |
TASS Belgium NV |
131:4758606c9316 | 268 | zc->parent->ready(zc->parent); |
tass | 68:0847e35d08a6 | 269 | } |
TASS Belgium NV |
131:4758606c9316 | 270 | |
TASS Belgium NV |
131:4758606c9316 | 271 | static void (*zmq_hs_cb[])(struct zmq_connector *) = { |
tass | 68:0847e35d08a6 | 272 | NULL, |
tass | 68:0847e35d08a6 | 273 | zmq_hs_connected, |
tass | 68:0847e35d08a6 | 274 | zmq_hs_signature, |
tass | 68:0847e35d08a6 | 275 | zmq_hs_version, |
tass | 68:0847e35d08a6 | 276 | zmq_hs_greeting, |
tass | 68:0847e35d08a6 | 277 | zmq_hs_rdy, |
tass | 68:0847e35d08a6 | 278 | zmq_hs_busy |
tass | 68:0847e35d08a6 | 279 | }; |
tass | 68:0847e35d08a6 | 280 | |
tass | 68:0847e35d08a6 | 281 | |
tass | 68:0847e35d08a6 | 282 | static void cb_tcp0mq(uint16_t ev, struct pico_socket *s) |
tass | 68:0847e35d08a6 | 283 | { |
TASS Belgium NV |
131:4758606c9316 | 284 | struct pico_ip4 orig; |
TASS Belgium NV |
131:4758606c9316 | 285 | uint16_t port; |
TASS Belgium NV |
131:4758606c9316 | 286 | char peer[30]; |
TASS Belgium NV |
131:4758606c9316 | 287 | struct zmq_connector *z_a, *zc; |
TASS Belgium NV |
131:4758606c9316 | 288 | ZMQ z = ZMTP(s); |
TASS Belgium NV |
131:4758606c9316 | 289 | |
TASS Belgium NV |
131:4758606c9316 | 290 | /* Publisher. Accepting new subscribers */ |
TASS Belgium NV |
131:4758606c9316 | 291 | if (z) { |
TASS Belgium NV |
131:4758606c9316 | 292 | if (ev & PICO_SOCK_EV_CONN) { |
TASS Belgium NV |
131:4758606c9316 | 293 | z_a = pico_zalloc(sizeof(struct zmq_socket)); |
TASS Belgium NV |
131:4758606c9316 | 294 | if (z_a == NULL) |
TASS Belgium NV |
131:4758606c9316 | 295 | return; |
TASS Belgium NV |
131:4758606c9316 | 296 | |
TASS Belgium NV |
131:4758606c9316 | 297 | z_a->sock = pico_socket_accept(s, &orig, &port); |
TASS Belgium NV |
131:4758606c9316 | 298 | pico_ipv4_to_string(peer, orig.addr); |
TASS Belgium NV |
131:4758606c9316 | 299 | dbg("tcp0mq> Connection requested by %s:%u.\n", peer, short_be(port)); |
TASS Belgium NV |
131:4758606c9316 | 300 | if (z->state == ST_OPEN) { |
TASS Belgium NV |
131:4758606c9316 | 301 | dbg("tcp0mq> Accepted connection! New subscriber on sock %p.\n", z_a->sock); |
TASS Belgium NV |
131:4758606c9316 | 302 | zmq_connector_add(z, z_a); |
TASS Belgium NV |
131:4758606c9316 | 303 | z_a->role = ROLE_PUBLISHER; |
TASS Belgium NV |
131:4758606c9316 | 304 | z_a->state = ST_CONNECTED; |
TASS Belgium NV |
131:4758606c9316 | 305 | zmq_hs_connected(z_a); |
TASS Belgium NV |
131:4758606c9316 | 306 | } else { |
TASS Belgium NV |
131:4758606c9316 | 307 | dbg("tcp0mq> Server busy, connection rejected\n"); |
TASS Belgium NV |
131:4758606c9316 | 308 | pico_socket_close(z_a->sock); |
TASS Belgium NV |
131:4758606c9316 | 309 | } |
TASS Belgium NV |
131:4758606c9316 | 310 | } |
TASS Belgium NV |
131:4758606c9316 | 311 | |
tass | 68:0847e35d08a6 | 312 | return; |
tass | 68:0847e35d08a6 | 313 | } |
tass | 68:0847e35d08a6 | 314 | |
TASS Belgium NV |
131:4758606c9316 | 315 | zc = find_subscriber(s); |
TASS Belgium NV |
131:4758606c9316 | 316 | if (!zc) { |
TASS Belgium NV |
131:4758606c9316 | 317 | dbg("Cannot find subscriber with socket %p, ev = %d!\n", s, ev); |
TASS Belgium NV |
131:4758606c9316 | 318 | /* pico_socket_close(s); */ |
TASS Belgium NV |
131:4758606c9316 | 319 | return; |
TASS Belgium NV |
131:4758606c9316 | 320 | } |
tass | 68:0847e35d08a6 | 321 | |
TASS Belgium NV |
131:4758606c9316 | 322 | if ((ev & PICO_SOCK_EV_CONN) && zc->role == ROLE_SUBSCRIBER && zc->state == ST_OPEN) |
TASS Belgium NV |
131:4758606c9316 | 323 | { |
TASS Belgium NV |
131:4758606c9316 | 324 | zc->state = ST_CONNECTED; |
TASS Belgium NV |
131:4758606c9316 | 325 | zmq_hs_connected(zc); |
TASS Belgium NV |
131:4758606c9316 | 326 | } |
tass | 68:0847e35d08a6 | 327 | |
tass | 68:0847e35d08a6 | 328 | |
TASS Belgium NV |
131:4758606c9316 | 329 | if (ev & PICO_SOCK_EV_RD) { |
TASS Belgium NV |
131:4758606c9316 | 330 | if (zmq_hs_cb[zc->state]) |
TASS Belgium NV |
131:4758606c9316 | 331 | zmq_hs_cb[zc->state](zc); |
TASS Belgium NV |
131:4758606c9316 | 332 | } |
TASS Belgium NV |
131:4758606c9316 | 333 | |
TASS Belgium NV |
131:4758606c9316 | 334 | if ((ev & PICO_SOCK_EV_WR) && zc->parent && (zc->parent->role == ROLE_PUBLISHER) && (zc->state == ST_BUSY)) { |
TASS Belgium NV |
131:4758606c9316 | 335 | if (zmq_hs_cb[zc->state]) |
TASS Belgium NV |
131:4758606c9316 | 336 | zmq_hs_cb[zc->state](zc); |
TASS Belgium NV |
131:4758606c9316 | 337 | } |
TASS Belgium NV |
131:4758606c9316 | 338 | |
tass | 68:0847e35d08a6 | 339 | |
TASS Belgium NV |
131:4758606c9316 | 340 | if (ev & PICO_SOCK_EV_FIN) { |
TASS Belgium NV |
131:4758606c9316 | 341 | dbg("tcp0mq> Connection closed.\n"); |
TASS Belgium NV |
131:4758606c9316 | 342 | zmq_connector_del(zc); |
TASS Belgium NV |
131:4758606c9316 | 343 | } |
TASS Belgium NV |
131:4758606c9316 | 344 | |
TASS Belgium NV |
131:4758606c9316 | 345 | if (ev & PICO_SOCK_EV_ERR) { |
TASS Belgium NV |
131:4758606c9316 | 346 | dbg("tcp0mq> Socket Error received: %s. Bailing out.\n", strerror(pico_err)); |
TASS Belgium NV |
131:4758606c9316 | 347 | zmq_connector_del(zc); |
TASS Belgium NV |
131:4758606c9316 | 348 | } |
TASS Belgium NV |
131:4758606c9316 | 349 | |
TASS Belgium NV |
131:4758606c9316 | 350 | if (ev & PICO_SOCK_EV_CLOSE) { |
TASS Belgium NV |
131:4758606c9316 | 351 | dbg("tcp0mq> event close\n"); |
TASS Belgium NV |
131:4758606c9316 | 352 | zmq_connector_del(zc); |
TASS Belgium NV |
131:4758606c9316 | 353 | } |
TASS Belgium NV |
131:4758606c9316 | 354 | |
tass | 68:0847e35d08a6 | 355 | } |
tass | 68:0847e35d08a6 | 356 | |
tass | 68:0847e35d08a6 | 357 | ZMQ zmq_subscriber(void (*cb)(ZMQ z)) |
tass | 68:0847e35d08a6 | 358 | { |
TASS Belgium NV |
131:4758606c9316 | 359 | ZMQ z = pico_zalloc(sizeof(struct zmq_socket)); |
TASS Belgium NV |
131:4758606c9316 | 360 | if (!z) { |
TASS Belgium NV |
131:4758606c9316 | 361 | pico_err = PICO_ERR_ENOMEM; |
TASS Belgium NV |
131:4758606c9316 | 362 | return NULL; |
TASS Belgium NV |
131:4758606c9316 | 363 | } |
TASS Belgium NV |
131:4758606c9316 | 364 | |
TASS Belgium NV |
131:4758606c9316 | 365 | z->state = ST_BUSY; |
TASS Belgium NV |
131:4758606c9316 | 366 | z->ready = cb; |
TASS Belgium NV |
131:4758606c9316 | 367 | z->role = ROLE_SUBSCRIBER; |
TASS Belgium NV |
131:4758606c9316 | 368 | pico_tree_insert(&zmq_sockets, z); |
TASS Belgium NV |
131:4758606c9316 | 369 | return z; |
tass | 68:0847e35d08a6 | 370 | } |
tass | 68:0847e35d08a6 | 371 | |
TASS Belgium NV |
131:4758606c9316 | 372 | int zmq_connect(ZMQ z, char *address, uint16_t port) |
tass | 68:0847e35d08a6 | 373 | { |
TASS Belgium NV |
131:4758606c9316 | 374 | struct pico_ip4 ip = { |
TASS Belgium NV |
131:4758606c9316 | 375 | 0 |
TASS Belgium NV |
131:4758606c9316 | 376 | }; |
TASS Belgium NV |
131:4758606c9316 | 377 | struct zmq_connector *z_c; |
TASS Belgium NV |
131:4758606c9316 | 378 | uint8_t sockopts = 1; |
TASS Belgium NV |
131:4758606c9316 | 379 | if (pico_string_to_ipv4(address, &ip.addr) < 0) { |
TASS Belgium NV |
131:4758606c9316 | 380 | dbg("FIXME!! I need to synchronize with the dns client to get to my publisher :(\n"); |
TASS Belgium NV |
131:4758606c9316 | 381 | return -1; |
TASS Belgium NV |
131:4758606c9316 | 382 | } |
tass | 68:0847e35d08a6 | 383 | |
TASS Belgium NV |
131:4758606c9316 | 384 | z_c = pico_zalloc(sizeof(struct zmq_connector)); |
TASS Belgium NV |
131:4758606c9316 | 385 | if (!z_c) |
TASS Belgium NV |
131:4758606c9316 | 386 | return -1; |
TASS Belgium NV |
131:4758606c9316 | 387 | |
TASS Belgium NV |
131:4758606c9316 | 388 | z_c->role = ROLE_SUBSCRIBER; |
TASS Belgium NV |
131:4758606c9316 | 389 | z_c->state = ST_OPEN; |
TASS Belgium NV |
131:4758606c9316 | 390 | z_c->sock = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); |
TASS Belgium NV |
131:4758606c9316 | 391 | if (!z_c->sock) { |
TASS Belgium NV |
131:4758606c9316 | 392 | pico_free(z_c); |
TASS Belgium NV |
131:4758606c9316 | 393 | return -1; |
TASS Belgium NV |
131:4758606c9316 | 394 | } |
TASS Belgium NV |
131:4758606c9316 | 395 | |
TASS Belgium NV |
131:4758606c9316 | 396 | pico_socket_setoption(z_c->sock, PICO_TCP_NODELAY, &sockopts); |
TASS Belgium NV |
131:4758606c9316 | 397 | if (pico_socket_connect(z_c->sock, &ip, short_be(port)) < 0) |
TASS Belgium NV |
131:4758606c9316 | 398 | return -1; |
TASS Belgium NV |
131:4758606c9316 | 399 | |
TASS Belgium NV |
131:4758606c9316 | 400 | zmq_connector_add(z, z_c); |
TASS Belgium NV |
131:4758606c9316 | 401 | return 0; |
tass | 68:0847e35d08a6 | 402 | } |
tass | 68:0847e35d08a6 | 403 | |
tass | 68:0847e35d08a6 | 404 | ZMQ zmq_publisher(uint16_t _port, void (*cb)(ZMQ z)) |
tass | 68:0847e35d08a6 | 405 | { |
TASS Belgium NV |
131:4758606c9316 | 406 | struct pico_socket *s; |
TASS Belgium NV |
131:4758606c9316 | 407 | struct pico_ip4 inaddr_any = { |
TASS Belgium NV |
131:4758606c9316 | 408 | 0 |
TASS Belgium NV |
131:4758606c9316 | 409 | }; |
TASS Belgium NV |
131:4758606c9316 | 410 | uint8_t sockopts = 1; |
TASS Belgium NV |
131:4758606c9316 | 411 | uint16_t port = short_be(_port); |
TASS Belgium NV |
131:4758606c9316 | 412 | ZMQ z = NULL; |
TASS Belgium NV |
131:4758606c9316 | 413 | s = pico_socket_open(PICO_PROTO_IPV4, PICO_PROTO_TCP, &cb_tcp0mq); |
TASS Belgium NV |
131:4758606c9316 | 414 | if (!s) |
TASS Belgium NV |
131:4758606c9316 | 415 | return NULL; |
TASS Belgium NV |
131:4758606c9316 | 416 | |
TASS Belgium NV |
131:4758606c9316 | 417 | pico_socket_setoption(s, PICO_TCP_NODELAY, &sockopts); |
TASS Belgium NV |
131:4758606c9316 | 418 | |
TASS Belgium NV |
131:4758606c9316 | 419 | dbg("zmq_publisher: BIND\n"); |
TASS Belgium NV |
131:4758606c9316 | 420 | if (pico_socket_bind(s, &inaddr_any, &port) != 0) { |
TASS Belgium NV |
131:4758606c9316 | 421 | dbg("zmq publisher: BIND failed\n"); |
TASS Belgium NV |
131:4758606c9316 | 422 | return NULL; |
TASS Belgium NV |
131:4758606c9316 | 423 | } |
tass | 122:5b1e9de8bf7f | 424 | |
TASS Belgium NV |
131:4758606c9316 | 425 | if (pico_socket_listen(s, 2) != 0) { |
TASS Belgium NV |
131:4758606c9316 | 426 | dbg("zmq publisher: LISTEN failed\n"); |
TASS Belgium NV |
131:4758606c9316 | 427 | return NULL; |
TASS Belgium NV |
131:4758606c9316 | 428 | } |
TASS Belgium NV |
131:4758606c9316 | 429 | |
TASS Belgium NV |
131:4758606c9316 | 430 | dbg("zmq_publisher: Active and bound to local port %d\n", short_be(port)); |
tass | 68:0847e35d08a6 | 431 | |
TASS Belgium NV |
131:4758606c9316 | 432 | z = pico_zalloc(sizeof(struct zmq_socket)); |
TASS Belgium NV |
131:4758606c9316 | 433 | if (!z) { |
TASS Belgium NV |
131:4758606c9316 | 434 | pico_socket_close(s); |
TASS Belgium NV |
131:4758606c9316 | 435 | pico_err = PICO_ERR_ENOMEM; |
TASS Belgium NV |
131:4758606c9316 | 436 | return NULL; |
TASS Belgium NV |
131:4758606c9316 | 437 | } |
TASS Belgium NV |
131:4758606c9316 | 438 | |
TASS Belgium NV |
131:4758606c9316 | 439 | z->sock = s; |
TASS Belgium NV |
131:4758606c9316 | 440 | z->state = ST_OPEN; |
TASS Belgium NV |
131:4758606c9316 | 441 | z->ready = cb; |
TASS Belgium NV |
131:4758606c9316 | 442 | z->role = ROLE_PUBLISHER; |
TASS Belgium NV |
131:4758606c9316 | 443 | z->subs = NULL; |
TASS Belgium NV |
131:4758606c9316 | 444 | pico_tree_insert(&zmq_sockets, z); |
TASS Belgium NV |
131:4758606c9316 | 445 | dbg("zmq publisher created.\n"); |
TASS Belgium NV |
131:4758606c9316 | 446 | return z; |
tass | 68:0847e35d08a6 | 447 | } |
tass | 68:0847e35d08a6 | 448 | |
tass | 68:0847e35d08a6 | 449 | int zmq_send(ZMQ z, char *txt, int len) |
tass | 68:0847e35d08a6 | 450 | { |
tass | 68:0847e35d08a6 | 451 | struct zmq_msg *msg; |
tass | 68:0847e35d08a6 | 452 | struct zmq_connector *c = z->subs; |
tass | 68:0847e35d08a6 | 453 | int ret = 0; |
tass | 68:0847e35d08a6 | 454 | |
TASS Belgium NV |
131:4758606c9316 | 455 | if (!c) |
tass | 68:0847e35d08a6 | 456 | { |
tass | 68:0847e35d08a6 | 457 | dbg("no subscribers, bailing out\n"); |
tass | 68:0847e35d08a6 | 458 | return 0; /* Need at least one subscriber */ |
tass | 68:0847e35d08a6 | 459 | } |
TASS Belgium NV |
131:4758606c9316 | 460 | |
tass | 70:cd218dd180e5 | 461 | msg = pico_zalloc((size_t)(len + 2)); |
tass | 68:0847e35d08a6 | 462 | msg->flags = 4; |
tass | 68:0847e35d08a6 | 463 | msg->len = (uint8_t) len; |
TASS Belgium NV |
131:4758606c9316 | 464 | memcpy(msg->txt, txt, (size_t) len); |
tass | 68:0847e35d08a6 | 465 | |
tass | 68:0847e35d08a6 | 466 | while (c) { |
TASS Belgium NV |
131:4758606c9316 | 467 | dbg("write to %u\n", c->state); |
TASS Belgium NV |
131:4758606c9316 | 468 | if ((ST_RDY == c->state) && (pico_socket_write(c->sock, msg, len + 2) > 0)) |
TASS Belgium NV |
131:4758606c9316 | 469 | ret++; |
TASS Belgium NV |
131:4758606c9316 | 470 | |
TASS Belgium NV |
131:4758606c9316 | 471 | c = c->next; |
tass | 68:0847e35d08a6 | 472 | } |
tass | 68:0847e35d08a6 | 473 | pico_free(msg); |
tass | 68:0847e35d08a6 | 474 | return ret; |
tass | 68:0847e35d08a6 | 475 | } |
tass | 68:0847e35d08a6 | 476 | |
tass | 68:0847e35d08a6 | 477 | int zmq_recv(ZMQ z, char *txt) |
tass | 68:0847e35d08a6 | 478 | { |
TASS Belgium NV |
131:4758606c9316 | 479 | int ret; |
TASS Belgium NV |
131:4758606c9316 | 480 | struct zmq_msg msg; |
TASS Belgium NV |
131:4758606c9316 | 481 | struct zmq_connector *nxt, *c = z->subs; |
TASS Belgium NV |
131:4758606c9316 | 482 | if (z->state != ST_RDY) |
TASS Belgium NV |
131:4758606c9316 | 483 | return 0; |
TASS Belgium NV |
131:4758606c9316 | 484 | |
TASS Belgium NV |
131:4758606c9316 | 485 | while (c) { |
TASS Belgium NV |
131:4758606c9316 | 486 | nxt = c->next; |
TASS Belgium NV |
131:4758606c9316 | 487 | ret = pico_socket_read(c->sock, &msg, 2); |
TASS Belgium NV |
131:4758606c9316 | 488 | if (ret < 0) { |
TASS Belgium NV |
131:4758606c9316 | 489 | dbg("Error reading!\n"); |
TASS Belgium NV |
131:4758606c9316 | 490 | zmq_connector_del(c); |
TASS Belgium NV |
131:4758606c9316 | 491 | } else if (ret < 2) { |
TASS Belgium NV |
131:4758606c9316 | 492 | c->state = ST_BUSY; |
TASS Belgium NV |
131:4758606c9316 | 493 | } else { |
TASS Belgium NV |
131:4758606c9316 | 494 | return pico_socket_read(c->sock, txt, msg.len); |
TASS Belgium NV |
131:4758606c9316 | 495 | } |
TASS Belgium NV |
131:4758606c9316 | 496 | |
TASS Belgium NV |
131:4758606c9316 | 497 | c = nxt; |
TASS Belgium NV |
131:4758606c9316 | 498 | } |
TASS Belgium NV |
131:4758606c9316 | 499 | zmq_check_state(z); |
tass | 68:0847e35d08a6 | 500 | return 0; |
tass | 68:0847e35d08a6 | 501 | } |
tass | 68:0847e35d08a6 | 502 | |
tass | 68:0847e35d08a6 | 503 | void zmq_close(ZMQ z) |
tass | 68:0847e35d08a6 | 504 | { |
TASS Belgium NV |
131:4758606c9316 | 505 | struct zmq_connector *nxt, *c = z->subs; |
TASS Belgium NV |
131:4758606c9316 | 506 | while(c) { |
TASS Belgium NV |
131:4758606c9316 | 507 | nxt = c->next; |
TASS Belgium NV |
131:4758606c9316 | 508 | zmq_connector_del(c); |
TASS Belgium NV |
131:4758606c9316 | 509 | c = nxt; |
TASS Belgium NV |
131:4758606c9316 | 510 | } |
TASS Belgium NV |
131:4758606c9316 | 511 | pico_socket_close(z->sock); |
TASS Belgium NV |
131:4758606c9316 | 512 | pico_free(z); |
tass | 68:0847e35d08a6 | 513 | } |