Add pubsub system to edge, with one example TEST channel

This commit is contained in:
Hamish Coleman 2022-02-05 18:39:26 +00:00
parent 509a8bcd14
commit a40ed15d93
5 changed files with 215 additions and 45 deletions

View File

@ -58,15 +58,23 @@ Fields:
The maximum length of the entire line of text is 80 octets.
All request packets should generate a reply. However, this reply may simply
be an error.
### Message Type
This is a single octet that is either "r" for a read (or query) method
call or "w" for a write (or change) method call.
This is a single octet specifying the type:
- "r" for a read-only method (or one that does not need change permissions)
- "w" for a write method (or one that makes changes)
- "s" for a subscribe method to request this socket receive some events
To simplify the interface, the reply from both read and write calls to the
same method is expected to contain the same data. In the case of a write
call, the reply will contain the new state after making the requested change.
The subscribe and events message flow works with a different set of messages.
### Options
The options field is a colon separated set of options for this request. Only
@ -90,8 +98,9 @@ Where possible, the error replies will also include this tag, however some
errors occur before the tag is parsed.
The tag is not interpreted by the daemon, it is simply echoed back in all
the replies. It is expected to be a short string that the client chooses
to be unique amongst all recent or still outstanding requests.
the replies. It is expected to be a short string that the client knows
will be unique amongst all recent, still outstanding or subscription requests
on a given socket.
One possible client implementation is a number between 0 and 999, incremented
for each request and wrapping around to zero when it is greater than 999.
@ -128,6 +137,9 @@ e.g:
Each UDP packet in the reply is a complete and valid JSON dictionary
containing a fragment of information related to the entire reply.
Reply packets are generated both in response to requests and whenever
an event is published to a subscribed channel.
### Common metadata
There are two keys in each dictionary containing metadata. First
@ -177,6 +189,47 @@ packets will be required.
e.g:
`{"_tag":"108","_type":"row","mode":"p2p","ip4addr":"10.135.98.84","macaddr":"86:56:21:E4:AA:39","sockaddr":"192.168.7.191:41701","desc":"client4","lastseen":1584682200}`
### `_type: subscribed`
Signals that the subscription request has been successfully completed.
Any future events on the requested channel will be asynchronously sent
as `event` packets using the same tag as the subscribe request.
### `_type: unsubscribed`
Only one management client can be subscribed to any given event topic, so if
another subscribe request arrives, the older client will be sent this message
to let them know that they have been replaced.
(In the future, this may also be sent as a reply to a explicit unsubscribe
request)
### `_type: replacing`
If a new subscription request will replace an existing one, this message is
sent to the new client to inform them that they have replaced an older
connection.
### `_type: event`
Asynchronous events will arrive with this message type, using the same tag as
the original subscribe request. Just like with the `row` packets, the non
metadata contents are entirely defined by the topic and the specific n2n
version.
## Subscribe API
A client can subscribe to events using a request with the type of "s".
Once a subscribe has been successfully completed, any events published
on that channel will be forwarded to the client.
Only one management client can be subscribed to any given event topic,
with newer subscriptions replacing older ones.
The special channel "debug" will receive copies of all events published.
Note that this is for debugging of events and the packets may not have
the same tag as the debug subscription.
## Authentication
Some API requests will make global changes to the running daemon and may

View File

@ -286,4 +286,6 @@ const char* compression_str (uint8_t cmpr);
const char* transop_str (enum n2n_transform tr);
void readFromMgmtSocket (n2n_edge_t *eee);
void mgmt_event_post (enum n2n_event_topic topic, void *data);
#endif /* _N2N_H_ */

View File

@ -121,6 +121,11 @@ enum sn_purge {SN_PURGEABLE = 0, SN_UNPURGEABLE = 1};
#define N2N_EDGE_MGMT_PORT 5644
#define N2N_SN_MGMT_PORT 5645
enum n2n_event_topic {
N2N_EVENT_DEBUG = 0,
N2N_EVENT_TEST = 1,
};
#define N2N_MGMT_PASSWORD "n2n" /* default password for management port access (so far, json only) */

View File

@ -41,23 +41,37 @@ class JsonUDP():
def _rx(self, tagstr):
"""Wait for rx packets"""
# TODO: there are no timeouts with any of the recv calls
data, _ = self.sock.recvfrom(1024)
data = json.loads(data.decode('utf8'))
seen_begin = False
while not seen_begin:
# TODO: there are no timeouts with any of the recv calls
data, _ = self.sock.recvfrom(1024)
data = json.loads(data.decode('utf8'))
# TODO: We assume the first packet we get will be tagged for us
# and be either an "error" or a "begin"
assert(data['_tag'] == tagstr)
# TODO: We assume the first packet we get will be tagged for us
assert(data['_tag'] == tagstr)
if data['_type'] == 'error':
raise ValueError('Error: {}'.format(data['error']))
if data['_type'] == 'error':
raise ValueError('Error: {}'.format(data['error']))
assert(data['_type'] == 'begin')
if data['_type'] == 'replacing':
# a signal that we have evicted an earlier subscribe
continue
# Ideally, we would confirm that this is our "begin", but that
# would need the cmd passed into this method, and that would
# probably require parsing the cmdline passed to us :-(
# assert(data['cmd'] == cmd)
if data['_type'] == 'subscribe':
return True
if data['_type'] == 'begin':
seen_begin = True
# Ideally, we would confirm that this is our "begin", but that
# would need the cmd passed into this method, and that would
# probably require parsing the cmdline passed to us :-(
# assert(data['cmd'] == cmd)
continue
raise ValueError('Unknown data type {} from '
'edge'.format(data['_type']))
result = list()
error = None
@ -105,6 +119,21 @@ class JsonUDP():
def write(self, cmdline):
return self._call('w', cmdline)
def sub(self, cmdline):
return self._call('s', cmdline)
def readevent(self):
self.sock.settimeout(3600)
data, _ = self.sock.recvfrom(1024)
data = json.loads(data.decode('utf8'))
# assert(data['_tag'] == tagstr)
assert(data['_type'] == 'event')
del data['_tag']
del data['_type']
return data
def str_table(rows, columns, orderby):
"""Given an array of dicts, do a simple table print"""
@ -203,8 +232,17 @@ def subcmd_default(rpc, args):
cmdline = ' '.join([args.cmd] + args.args)
if args.write:
rows = rpc.write(cmdline)
else:
elif args.read:
rows = rpc.read(cmdline)
elif args.sub:
if not rpc.sub(cmdline):
raise ValueError('Could not subscribe')
while True:
event = rpc.readevent()
# FIXME: violates layering..
print(json.dumps(event, sort_keys=True, indent=4))
else:
raise ValueError('Unknown request type')
return json.dumps(rows, sort_keys=True, indent=4)
@ -228,6 +266,8 @@ def main():
group.add_argument('--write', action='store_true',
help='Make a write request (only to non pretty'
'printed cmds)')
group.add_argument('--sub', action='store_true',
help='Make a subscribe request')
ap.add_argument('cmd', action='store',
help='Command to run (try "help" for list)')
@ -236,6 +276,9 @@ def main():
args = ap.parse_args()
if not args.read and not args.write and not args.sub:
args.read = True
if args.raw or (args.cmd not in subcmds):
func = subcmd_default
else:

View File

@ -61,7 +61,7 @@ typedef struct mgmt_handler {
* Event topic names are defined in this structure
*/
typedef struct mgmt_events {
int topic; // topic number define
enum n2n_event_topic topic;
char *cmd;
char *help;
} mgmt_events_t;
@ -80,22 +80,26 @@ typedef struct mgmt_events {
} \
} while(0)
static void send_reply (mgmt_req_t *req, strbuf_t *buf, size_t msg_len) {
// TODO: error handling
sendto(req->eee->udp_mgmt_sock, buf->str, msg_len, 0,
ssize_t send_reply (mgmt_req_t *req, strbuf_t *buf, size_t msg_len) {
// TODO: better error handling (counters?)
return sendto(req->eee->udp_mgmt_sock, buf->str, msg_len, 0,
(struct sockaddr *) &req->sender_sock, sizeof(struct sockaddr_in));
}
size_t gen_json_1str (strbuf_t *buf, char *tag, char *_type, char *key, char *val) {
return snprintf(buf->str, buf->size,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"%s\","
"\"%s\":\"%s\"}\n",
tag,
_type,
key,
val);
}
static void send_json_1str (mgmt_req_t *req, strbuf_t *buf, char *_type, char *key, char *val) {
size_t msg_len = snprintf(buf->str, buf->size,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"%s\","
"\"%s\":\"%s\"}\n",
req->tag,
_type,
key,
val);
size_t msg_len = gen_json_1str(buf, req->tag, _type, key, val);
send_reply(req, buf, msg_len);
}
@ -112,8 +116,14 @@ static void send_json_1uint (mgmt_req_t *req, strbuf_t *buf, char *_type, char *
send_reply(req, buf, msg_len);
}
static void event_debug (mgmt_req_t *req, strbuf_t *buf) {
send_json_1str(req, buf, "event", "test", "test");
size_t event_debug (strbuf_t *buf, char *tag, void *data) {
traceEvent(TRACE_DEBUG, "Unexpected call to event_debug");
return 0;
}
size_t event_test (strbuf_t *buf, char *tag, void *data) {
size_t msg_len = gen_json_1str(buf, tag, "event", "test", (char *)data);
return msg_len;
}
static void mgmt_error (mgmt_req_t *req, strbuf_t *buf, char *msg) {
@ -315,6 +325,12 @@ static void mgmt_packetstats (mgmt_req_t *req, strbuf_t *buf, char *argv0, char
send_reply(req, buf, msg_len);
}
static void mgmt_post_test (mgmt_req_t *req, strbuf_t *buf, char *argv0, char *argv) {
send_json_1str(req, buf, "row", "sending", "test");
mgmt_event_post (N2N_EVENT_TEST, argv);
}
static void mgmt_unimplemented (mgmt_req_t *req, strbuf_t *buf, char *argv0, char *argv) {
mgmt_error(req, buf, "unimplemented");
@ -334,25 +350,60 @@ static const mgmt_handler_t mgmt_handlers[] = {
{ .cmd = "supernodes", .help = "List current supernodes", .func = mgmt_supernodes},
{ .cmd = "timestamps", .help = "Event timestamps", .func = mgmt_timestamps},
{ .cmd = "packetstats", .help = "traffic counters", .func = mgmt_packetstats},
{ .cmd = "post.test", .help = "send a test event", .func = mgmt_post_test},
{ .cmd = "help", .flags = FLAG_WROK, .help = "Show JSON commands", .func = mgmt_help},
{ .cmd = "help.events", .help = "Show available Subscribe topics", .func = mgmt_help_events},
};
/* Current subscriber for each event topic */
static mgmt_req_t mgmt_event_subscribers[] = {
[0] = { .eee = NULL, .type = N2N_MGMT_UNKNOWN, .tag = "\0" },
[N2N_EVENT_DEBUG] = { .eee = NULL, .type = N2N_MGMT_UNKNOWN, .tag = "\0" },
[N2N_EVENT_TEST] = { .eee = NULL, .type = N2N_MGMT_UNKNOWN, .tag = "\0" },
};
/* Map topic number to function */
static const void (*mgmt_events[])(mgmt_req_t *req, strbuf_t *buf) = {
[0] = event_debug,
static const size_t (*mgmt_events[])(strbuf_t *buf, char *tag, void *data) = {
[N2N_EVENT_DEBUG] = event_debug,
[N2N_EVENT_TEST] = event_test,
};
/* Allow help and subscriptions to use topic name */
static const mgmt_events_t mgmt_event_names[] = {
{ .cmd = "debug", .topic = 0, .help = "All events - for event debugging"},
{ .cmd = "debug", .topic = N2N_EVENT_DEBUG, .help = "All events - for event debugging"},
{ .cmd = "test", .topic = N2N_EVENT_TEST, .help = "Used only by post.test"},
};
void mgmt_event_post (enum n2n_event_topic topic, void *data) {
mgmt_req_t *debug = &mgmt_event_subscribers[N2N_EVENT_DEBUG];
mgmt_req_t *sub = &mgmt_event_subscribers[topic];
if( sub->type != N2N_MGMT_SUB && debug->type != N2N_MGMT_SUB) {
// If neither of this topic or the debug topic have a subscriber
// then we dont need to do any work
return;
}
char buf_space[100];
strbuf_t *buf;
STRBUF_INIT(buf, buf_space);
char *tag;
if (sub->type == N2N_MGMT_SUB) {
tag = sub->tag;
} else {
tag = debug->tag;
}
size_t msg_len = mgmt_events[topic](buf, tag, data);
if (sub->type == N2N_MGMT_SUB) {
send_reply(sub, buf, msg_len);
}
if (debug->type == N2N_MGMT_SUB) {
send_reply(debug, buf, msg_len);
}
}
static void mgmt_help_events (mgmt_req_t *req, strbuf_t *buf, char *argv0, char *argv) {
size_t msg_len;
@ -364,11 +415,11 @@ static void mgmt_help_events (mgmt_req_t *req, strbuf_t *buf, char *argv0, char
char host[40];
char serv[6];
if(getnameinfo(
(struct sockaddr *)&sub->sender_sock, sizeof(sub->sender_sock),
host, sizeof(host),
serv, sizeof(serv),
NI_NUMERICHOST|NI_NUMERICSERV) != 0) {
if((sub->type != N2N_MGMT_SUB) ||
getnameinfo((struct sockaddr *)&sub->sender_sock, sizeof(sub->sender_sock),
host, sizeof(host),
serv, sizeof(serv),
NI_NUMERICHOST|NI_NUMERICSERV) != 0) {
host[0] = '?';
host[1] = 0;
serv[0] = '?';
@ -533,7 +584,23 @@ static void handleMgmtJson (mgmt_req_t *req, char *udp_buf, const int recvlen) {
}
if(req->type == N2N_MGMT_SUB) {
mgmt_error(req, buf, "unimplemented");
int handler;
lookup_handler(handler, mgmt_event_names, argv0);
if(handler == -1) {
mgmt_error(req, buf, "unknowntopic");
return;
}
int topic = mgmt_event_names[handler].topic;
if(mgmt_event_subscribers[topic].type == N2N_MGMT_SUB) {
send_json_1str(&mgmt_event_subscribers[topic], buf,
"unsubscribed", "topic", argv0);
send_json_1str(req, buf, "replacing", "topic", argv0);
}
memcpy(&mgmt_event_subscribers[topic], req, sizeof(*req));
send_json_1str(req, buf, "subscribe", "topic", argv0);
return;
}