From a40ed15d93030de84a4b404806f9cf1dc7965dda Mon Sep 17 00:00:00 2001 From: Hamish Coleman Date: Sat, 5 Feb 2022 18:39:26 +0000 Subject: [PATCH] Add pubsub system to edge, with one example TEST channel --- doc/ManagementAPI.md | 65 ++++++++++++++++++++--- include/n2n.h | 2 + include/n2n_define.h | 5 ++ scripts/n2n-ctl | 71 ++++++++++++++++++++----- src/edge_management.c | 117 +++++++++++++++++++++++++++++++++--------- 5 files changed, 215 insertions(+), 45 deletions(-) diff --git a/doc/ManagementAPI.md b/doc/ManagementAPI.md index 3ad0744..3f179f4 100644 --- a/doc/ManagementAPI.md +++ b/doc/ManagementAPI.md @@ -1,6 +1,6 @@ # Management API -This document is focused on the machine readable API interfaces. +This document is focused on the machine readable API interfaces. Both the edge and the supernode provide a management interface UDP port. These interfaces have some documentation on their non machine readable @@ -48,7 +48,7 @@ but this is intended for debugging. The request is a single UDP packet containing one line of text with at least three space separated fields. Any text after the third field is available for -the API method to use for additional parameters +the API method to use for additional parameters Fields: - Message Type @@ -58,15 +58,23 @@ Fields: The maximum length of the entire line of text is 80 octets. +All request packets should generate a reply. However, this reply may simply +be an error. + ### Message Type -This is a single octet that is either "r" for a read (or query) method -call or "w" for a write (or change) method call. +This is a single octet specifying the type: + +- "r" for a read-only method (or one that does not need change permissions) +- "w" for a write method (or one that makes changes) +- "s" for a subscribe method to request this socket receive some events To simplify the interface, the reply from both read and write calls to the same method is expected to contain the same data. In the case of a write call, the reply will contain the new state after making the requested change. +The subscribe and events message flow works with a different set of messages. + ### Options The options field is a colon separated set of options for this request. Only @@ -90,8 +98,9 @@ Where possible, the error replies will also include this tag, however some errors occur before the tag is parsed. The tag is not interpreted by the daemon, it is simply echoed back in all -the replies. It is expected to be a short string that the client chooses -to be unique amongst all recent or still outstanding requests. +the replies. It is expected to be a short string that the client knows +will be unique amongst all recent, still outstanding or subscription requests +on a given socket. One possible client implementation is a number between 0 and 999, incremented for each request and wrapping around to zero when it is greater than 999. @@ -128,6 +137,9 @@ e.g: Each UDP packet in the reply is a complete and valid JSON dictionary containing a fragment of information related to the entire reply. +Reply packets are generated both in response to requests and whenever +an event is published to a subscribed channel. + ### Common metadata There are two keys in each dictionary containing metadata. First @@ -177,6 +189,47 @@ packets will be required. e.g: `{"_tag":"108","_type":"row","mode":"p2p","ip4addr":"10.135.98.84","macaddr":"86:56:21:E4:AA:39","sockaddr":"192.168.7.191:41701","desc":"client4","lastseen":1584682200}` +### `_type: subscribed` + +Signals that the subscription request has been successfully completed. +Any future events on the requested channel will be asynchronously sent +as `event` packets using the same tag as the subscribe request. + +### `_type: unsubscribed` + +Only one management client can be subscribed to any given event topic, so if +another subscribe request arrives, the older client will be sent this message +to let them know that they have been replaced. + +(In the future, this may also be sent as a reply to a explicit unsubscribe +request) + +### `_type: replacing` + +If a new subscription request will replace an existing one, this message is +sent to the new client to inform them that they have replaced an older +connection. + +### `_type: event` + +Asynchronous events will arrive with this message type, using the same tag as +the original subscribe request. Just like with the `row` packets, the non +metadata contents are entirely defined by the topic and the specific n2n +version. + +## Subscribe API + +A client can subscribe to events using a request with the type of "s". +Once a subscribe has been successfully completed, any events published +on that channel will be forwarded to the client. + +Only one management client can be subscribed to any given event topic, +with newer subscriptions replacing older ones. + +The special channel "debug" will receive copies of all events published. +Note that this is for debugging of events and the packets may not have +the same tag as the debug subscription. + ## Authentication Some API requests will make global changes to the running daemon and may diff --git a/include/n2n.h b/include/n2n.h index 9ca353a..5a44b3d 100644 --- a/include/n2n.h +++ b/include/n2n.h @@ -286,4 +286,6 @@ const char* compression_str (uint8_t cmpr); const char* transop_str (enum n2n_transform tr); void readFromMgmtSocket (n2n_edge_t *eee); + +void mgmt_event_post (enum n2n_event_topic topic, void *data); #endif /* _N2N_H_ */ diff --git a/include/n2n_define.h b/include/n2n_define.h index 451dacb..0226b2b 100644 --- a/include/n2n_define.h +++ b/include/n2n_define.h @@ -121,6 +121,11 @@ enum sn_purge {SN_PURGEABLE = 0, SN_UNPURGEABLE = 1}; #define N2N_EDGE_MGMT_PORT 5644 #define N2N_SN_MGMT_PORT 5645 +enum n2n_event_topic { + N2N_EVENT_DEBUG = 0, + N2N_EVENT_TEST = 1, +}; + #define N2N_MGMT_PASSWORD "n2n" /* default password for management port access (so far, json only) */ diff --git a/scripts/n2n-ctl b/scripts/n2n-ctl index 4e63eda..84b48cb 100755 --- a/scripts/n2n-ctl +++ b/scripts/n2n-ctl @@ -41,23 +41,37 @@ class JsonUDP(): def _rx(self, tagstr): """Wait for rx packets""" - # TODO: there are no timeouts with any of the recv calls - data, _ = self.sock.recvfrom(1024) - data = json.loads(data.decode('utf8')) + seen_begin = False + while not seen_begin: + # TODO: there are no timeouts with any of the recv calls + data, _ = self.sock.recvfrom(1024) + data = json.loads(data.decode('utf8')) - # TODO: We assume the first packet we get will be tagged for us - # and be either an "error" or a "begin" - assert(data['_tag'] == tagstr) + # TODO: We assume the first packet we get will be tagged for us + assert(data['_tag'] == tagstr) - if data['_type'] == 'error': - raise ValueError('Error: {}'.format(data['error'])) + if data['_type'] == 'error': + raise ValueError('Error: {}'.format(data['error'])) - assert(data['_type'] == 'begin') + if data['_type'] == 'replacing': + # a signal that we have evicted an earlier subscribe + continue - # Ideally, we would confirm that this is our "begin", but that - # would need the cmd passed into this method, and that would - # probably require parsing the cmdline passed to us :-( - # assert(data['cmd'] == cmd) + if data['_type'] == 'subscribe': + return True + + if data['_type'] == 'begin': + seen_begin = True + + # Ideally, we would confirm that this is our "begin", but that + # would need the cmd passed into this method, and that would + # probably require parsing the cmdline passed to us :-( + # assert(data['cmd'] == cmd) + + continue + + raise ValueError('Unknown data type {} from ' + 'edge'.format(data['_type'])) result = list() error = None @@ -105,6 +119,21 @@ class JsonUDP(): def write(self, cmdline): return self._call('w', cmdline) + def sub(self, cmdline): + return self._call('s', cmdline) + + def readevent(self): + self.sock.settimeout(3600) + + data, _ = self.sock.recvfrom(1024) + data = json.loads(data.decode('utf8')) + # assert(data['_tag'] == tagstr) + assert(data['_type'] == 'event') + + del data['_tag'] + del data['_type'] + return data + def str_table(rows, columns, orderby): """Given an array of dicts, do a simple table print""" @@ -203,8 +232,17 @@ def subcmd_default(rpc, args): cmdline = ' '.join([args.cmd] + args.args) if args.write: rows = rpc.write(cmdline) - else: + elif args.read: rows = rpc.read(cmdline) + elif args.sub: + if not rpc.sub(cmdline): + raise ValueError('Could not subscribe') + while True: + event = rpc.readevent() + # FIXME: violates layering.. + print(json.dumps(event, sort_keys=True, indent=4)) + else: + raise ValueError('Unknown request type') return json.dumps(rows, sort_keys=True, indent=4) @@ -228,6 +266,8 @@ def main(): group.add_argument('--write', action='store_true', help='Make a write request (only to non pretty' 'printed cmds)') + group.add_argument('--sub', action='store_true', + help='Make a subscribe request') ap.add_argument('cmd', action='store', help='Command to run (try "help" for list)') @@ -236,6 +276,9 @@ def main(): args = ap.parse_args() + if not args.read and not args.write and not args.sub: + args.read = True + if args.raw or (args.cmd not in subcmds): func = subcmd_default else: diff --git a/src/edge_management.c b/src/edge_management.c index 1aff29b..9e562e3 100644 --- a/src/edge_management.c +++ b/src/edge_management.c @@ -61,7 +61,7 @@ typedef struct mgmt_handler { * Event topic names are defined in this structure */ typedef struct mgmt_events { - int topic; // topic number define + enum n2n_event_topic topic; char *cmd; char *help; } mgmt_events_t; @@ -80,22 +80,26 @@ typedef struct mgmt_events { } \ } while(0) -static void send_reply (mgmt_req_t *req, strbuf_t *buf, size_t msg_len) { - // TODO: error handling - sendto(req->eee->udp_mgmt_sock, buf->str, msg_len, 0, +ssize_t send_reply (mgmt_req_t *req, strbuf_t *buf, size_t msg_len) { + // TODO: better error handling (counters?) + return sendto(req->eee->udp_mgmt_sock, buf->str, msg_len, 0, (struct sockaddr *) &req->sender_sock, sizeof(struct sockaddr_in)); } +size_t gen_json_1str (strbuf_t *buf, char *tag, char *_type, char *key, char *val) { + return snprintf(buf->str, buf->size, + "{" + "\"_tag\":\"%s\"," + "\"_type\":\"%s\"," + "\"%s\":\"%s\"}\n", + tag, + _type, + key, + val); +} + static void send_json_1str (mgmt_req_t *req, strbuf_t *buf, char *_type, char *key, char *val) { - size_t msg_len = snprintf(buf->str, buf->size, - "{" - "\"_tag\":\"%s\"," - "\"_type\":\"%s\"," - "\"%s\":\"%s\"}\n", - req->tag, - _type, - key, - val); + size_t msg_len = gen_json_1str(buf, req->tag, _type, key, val); send_reply(req, buf, msg_len); } @@ -112,8 +116,14 @@ static void send_json_1uint (mgmt_req_t *req, strbuf_t *buf, char *_type, char * send_reply(req, buf, msg_len); } -static void event_debug (mgmt_req_t *req, strbuf_t *buf) { - send_json_1str(req, buf, "event", "test", "test"); +size_t event_debug (strbuf_t *buf, char *tag, void *data) { + traceEvent(TRACE_DEBUG, "Unexpected call to event_debug"); + return 0; +} + +size_t event_test (strbuf_t *buf, char *tag, void *data) { + size_t msg_len = gen_json_1str(buf, tag, "event", "test", (char *)data); + return msg_len; } static void mgmt_error (mgmt_req_t *req, strbuf_t *buf, char *msg) { @@ -315,6 +325,12 @@ static void mgmt_packetstats (mgmt_req_t *req, strbuf_t *buf, char *argv0, char send_reply(req, buf, msg_len); } +static void mgmt_post_test (mgmt_req_t *req, strbuf_t *buf, char *argv0, char *argv) { + + send_json_1str(req, buf, "row", "sending", "test"); + mgmt_event_post (N2N_EVENT_TEST, argv); +} + static void mgmt_unimplemented (mgmt_req_t *req, strbuf_t *buf, char *argv0, char *argv) { mgmt_error(req, buf, "unimplemented"); @@ -334,25 +350,60 @@ static const mgmt_handler_t mgmt_handlers[] = { { .cmd = "supernodes", .help = "List current supernodes", .func = mgmt_supernodes}, { .cmd = "timestamps", .help = "Event timestamps", .func = mgmt_timestamps}, { .cmd = "packetstats", .help = "traffic counters", .func = mgmt_packetstats}, + { .cmd = "post.test", .help = "send a test event", .func = mgmt_post_test}, { .cmd = "help", .flags = FLAG_WROK, .help = "Show JSON commands", .func = mgmt_help}, { .cmd = "help.events", .help = "Show available Subscribe topics", .func = mgmt_help_events}, }; /* Current subscriber for each event topic */ static mgmt_req_t mgmt_event_subscribers[] = { - [0] = { .eee = NULL, .type = N2N_MGMT_UNKNOWN, .tag = "\0" }, + [N2N_EVENT_DEBUG] = { .eee = NULL, .type = N2N_MGMT_UNKNOWN, .tag = "\0" }, + [N2N_EVENT_TEST] = { .eee = NULL, .type = N2N_MGMT_UNKNOWN, .tag = "\0" }, }; /* Map topic number to function */ -static const void (*mgmt_events[])(mgmt_req_t *req, strbuf_t *buf) = { - [0] = event_debug, +static const size_t (*mgmt_events[])(strbuf_t *buf, char *tag, void *data) = { + [N2N_EVENT_DEBUG] = event_debug, + [N2N_EVENT_TEST] = event_test, }; /* Allow help and subscriptions to use topic name */ static const mgmt_events_t mgmt_event_names[] = { - { .cmd = "debug", .topic = 0, .help = "All events - for event debugging"}, + { .cmd = "debug", .topic = N2N_EVENT_DEBUG, .help = "All events - for event debugging"}, + { .cmd = "test", .topic = N2N_EVENT_TEST, .help = "Used only by post.test"}, }; +void mgmt_event_post (enum n2n_event_topic topic, void *data) { + mgmt_req_t *debug = &mgmt_event_subscribers[N2N_EVENT_DEBUG]; + mgmt_req_t *sub = &mgmt_event_subscribers[topic]; + + if( sub->type != N2N_MGMT_SUB && debug->type != N2N_MGMT_SUB) { + // If neither of this topic or the debug topic have a subscriber + // then we dont need to do any work + return; + } + + char buf_space[100]; + strbuf_t *buf; + STRBUF_INIT(buf, buf_space); + + char *tag; + if (sub->type == N2N_MGMT_SUB) { + tag = sub->tag; + } else { + tag = debug->tag; + } + + size_t msg_len = mgmt_events[topic](buf, tag, data); + + if (sub->type == N2N_MGMT_SUB) { + send_reply(sub, buf, msg_len); + } + if (debug->type == N2N_MGMT_SUB) { + send_reply(debug, buf, msg_len); + } +} + static void mgmt_help_events (mgmt_req_t *req, strbuf_t *buf, char *argv0, char *argv) { size_t msg_len; @@ -364,11 +415,11 @@ static void mgmt_help_events (mgmt_req_t *req, strbuf_t *buf, char *argv0, char char host[40]; char serv[6]; - if(getnameinfo( - (struct sockaddr *)&sub->sender_sock, sizeof(sub->sender_sock), - host, sizeof(host), - serv, sizeof(serv), - NI_NUMERICHOST|NI_NUMERICSERV) != 0) { + if((sub->type != N2N_MGMT_SUB) || + getnameinfo((struct sockaddr *)&sub->sender_sock, sizeof(sub->sender_sock), + host, sizeof(host), + serv, sizeof(serv), + NI_NUMERICHOST|NI_NUMERICSERV) != 0) { host[0] = '?'; host[1] = 0; serv[0] = '?'; @@ -533,7 +584,23 @@ static void handleMgmtJson (mgmt_req_t *req, char *udp_buf, const int recvlen) { } if(req->type == N2N_MGMT_SUB) { - mgmt_error(req, buf, "unimplemented"); + int handler; + lookup_handler(handler, mgmt_event_names, argv0); + if(handler == -1) { + mgmt_error(req, buf, "unknowntopic"); + return; + } + + int topic = mgmt_event_names[handler].topic; + if(mgmt_event_subscribers[topic].type == N2N_MGMT_SUB) { + send_json_1str(&mgmt_event_subscribers[topic], buf, + "unsubscribed", "topic", argv0); + send_json_1str(req, buf, "replacing", "topic", argv0); + } + + memcpy(&mgmt_event_subscribers[topic], req, sizeof(*req)); + + send_json_1str(req, buf, "subscribe", "topic", argv0); return; }