JSON Reply Management API - feature parity with old management interfaces (#861)

* Ensure that recent code additions pass the linter

* Include some of the more obviously correct lint fixes to edge_utils.c

* Refactor edge JSON api into its own source file

* Use shorter names for static management functions

* Implement a JSON RPC way of managing the verbosity

* Tidy up help display in n2nctl script

* Make note of issue with implementing the stop command

* Implement a JSON RPC call to fetch current community

* Make n2nhttpd time value be more self-contained

* Make n2nhttpd order more closely match the existing management stats output

* Wire up status page to the verbosity setting

* Add JSON versions of the remainder of the edge management stats

* Add new file to cmake

* Properly define management handler

* Only update the last updated timestamp after a successful data fetch

* Function and types definition cleanup

* Force correct type for python scripts mgmt port

* Implement initial JSON API for supernode

* Fix whitespace error

* Use helper function for rendering peers ip4 address

* Proxy the auth requirement back out to the http client, allowing normal http auth to be used

* Ensure that we do not leak the federation community

* Use the same rpc method name and output for both edge and supernode for peers/edges

* Allow n2nctl to show raw data returned without resorting to tricks

* Make n2nctl pretty printer understandable with an empty table

* Use the full name for supernodes RPC call

* Use same RPC method name (but some missing fields) for getting communities from both edge and supernode

* Add *_sup_broadcast stats to edge packet stats output

* Refacter the stats into a packetstats method for supernode RPC

* Even if I am not going to prettyprint the timestamps, at least make all the timestamps on the page the same unit

* Simplify the RPC handlers by flagging some as writable and checking that in the multiplexer

* Remove invalid edges data

* Avoid crash on bad data to verbose RPC

* Avoid showing bad or inconsistant protocol data in communities RPC

* Minor clarification on when --write is handled

* Make linter happy

* Fix changed method name in n2nhttpd

* Move mainloop stop flag into the n2n_edge_t structure, allowing access from management commands

* Implement edge RPC stop command

* Move mainloop stop flag into the n2n_sn_t structure, allowing access from management commands

* Implement supernode RPC stop command

* Allow multiple pages to be served from mini httpd

* Extract common script functions into a separate URL

* Handle an edge case in the python rpc class

With a proper tag-based demultiplexer, this case should be a nop,
but we are single-threaded and rely on the packet ordering in this
library.

* Add n2nhttpd support to query supernode using urls prefixed with /supernode/

* Handle missing values in javascript table print

* Add another less filtering javascript key/value renderer

* Add a supernode.html page to the n2nhttpd

* Address lint issue

* Mention the second html page on the Scripts doc

* Remove purgable column from supernode edges list - it looks like it is rarely going to be set

* Add a simple one-line example command at the top of the API documentation

* Acknowledge that this is not the most efficient protocol, but point out that it was not supposed to be

* Make it clear that the n2nctl script works for both edge and supernode

* Fight with inconsistant github runner results

* Turn off the /right/ coverage generator
This commit is contained in:
Hamish Coleman 2021-10-23 06:20:05 +01:00 committed by GitHub
parent e6fcf1c55b
commit ae502d9181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1354 additions and 446 deletions

View File

@ -137,14 +137,20 @@ jobs:
- name: Generate coverage reports
run: |
make gcov
make cover COVERAGEDIR=coverage/${{ matrix.os }}
# This was working fine for tens of jobs, up until 2021-10-19T18:53+0100
# and it still works fine when run from my personal github actions.
# The next run at 2021-10-19T19:08+0100 didnt work.
# Assume that they changed something on the runner - I cannot debug it
# as I do not have a Mac.
#
# make cover COVERAGEDIR=coverage/${{ matrix.os }}
shell: bash
- name: Upload gcovr report artifact
uses: actions/upload-artifact@v2
with:
name: coverage
path: coverage
# - name: Upload gcovr report artifact
# uses: actions/upload-artifact@v2
# with:
# name: coverage
# path: coverage
- name: Upload data to codecov
uses: codecov/codecov-action@v2

View File

@ -117,7 +117,9 @@ endif(DEFINED WIN32)
#add_library(n2n STATIC ${N2N_DIR_SRCS})
add_library(n2n STATIC
src/n2n.c
src/edge_management.c
src/edge_utils.c
src/sn_management.c
src/sn_utils.c
src/wire.c
src/hexdump.c

View File

@ -10,11 +10,13 @@ Default Ports:
- UDP/5644 - edge
- UDP/5645 - supernode
A Quick start example query:
`echo r 1 help | nc -w1 -u 127.0.0.1 5644`
## JSON Query interface
As part of the management interface, A machine readable API exists for the
edge daemon. It takes a simple text request and replies with JSON formatted
data.
A machine readable API is available for both the edge and supernode. It
takes a simple text request and replies with JSON formatted data.
The request is in simple text so that the daemon does not need to include any
complex parser.
@ -34,6 +36,11 @@ pub/sub asynchronous event channels.
The replies will also handle some small amount of re-ordering of the
packets, but that is not an specific goal of the protocol.
Note that this API will reply with a relatively large number of UDP packets
and that it is not intended for high frequency or high volume data transfer.
It was written to use a low amount of memory and to support incremental
generation of the reply data.
With a small amount of effort, the API is intended to be human readable,
but this is intended for debugging.
@ -162,6 +169,10 @@ The substantial bulk of the data in the reply is contained within one or
more `row` packets. The non metadata contents of each `row` packet is
defined entirely by the method called and may change from version to version.
Each `row` packet contains exactly one complete JSON object. The row replies
may be processed incrementally as each row arrives and no batching of multiple
packets will be required.
e.g:
`{"_tag":"108","_type":"row","mode":"p2p","ip4addr":"10.135.98.84","macaddr":"86:56:21:E4:AA:39","sockaddr":"192.168.7.191:41701","desc":"client4","lastseen":1584682200}`

View File

@ -20,7 +20,9 @@ This shell script is used to run automated tests during development.
## `scripts/n2nctl`
This python script provides an easy command line interface to the running
edge. It uses UDP communications to talk to the Management API.
n2n processes. It uses UDP communications to talk to the Management API.
By specifying the right UDP port, it can talk to both the edge and the
supernode daemons.
Example:
- `scripts/n2nctl --help`
@ -33,8 +35,9 @@ a proxy for REST-like HTTP requests to talk to the Management API.
By default it runs on port 8080.
It also provides a simple HTML page showing some information, which when
run with default settings can be seen at http://localhost:8080/
It also provides a simple HTML page showing some edge information, which when
run with default settings can be seen at http://localhost:8080/ (Also
a http://localhost:8080/supernode.html page for the supernode)
Example:
- `scripts/n2nhttpd --help`

View File

@ -37,9 +37,9 @@
*/
//#define SKIP_MULTICAST_PEERS_DISCOVERY
// TODO: this struct is pretty empty now, collapse it to just n2n_edge_t ?
struct tunread_arg {
n2n_edge_t *eee;
int *keep_running;
};
extern HANDLE startTunReadThread (struct tunread_arg *arg);

View File

@ -262,7 +262,7 @@ void edge_send_packet2net (n2n_edge_t *eee, uint8_t *tap_pkt, size_t len);
void edge_read_from_tap (n2n_edge_t *eee);
int edge_get_n2n_socket (n2n_edge_t *eee);
int edge_get_management_socket (n2n_edge_t *eee);
int run_edge_loop (n2n_edge_t *eee, int *keep_running);
int run_edge_loop (n2n_edge_t *eee);
int quick_edge_init (char *device_name, char *community_name,
char *encrypt_key, char *device_mac,
char *local_ip_address,
@ -274,9 +274,11 @@ void sn_init (n2n_sn_t *sss);
void sn_term (n2n_sn_t *sss);
int supernode2sock (n2n_sock_t * sn, const n2n_sn_name_t addrIn);
struct peer_info* add_sn_to_list_by_mac_or_sock (struct peer_info **sn_list, n2n_sock_t *sock, const n2n_mac_t mac, int *skip_add);
int run_sn_loop (n2n_sn_t *sss, int *keep_running);
int run_sn_loop (n2n_sn_t *sss);
int assign_one_ip_subnet (n2n_sn_t *sss, struct sn_community *comm);
const char* compression_str (uint8_t cmpr);
const char* transop_str (enum n2n_transform tr);
void handleMgmtJson (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock);
void handleMgmtJson_sn (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock);
#endif /* _N2N_H_ */

View File

@ -685,6 +685,7 @@ struct n2n_edge {
n2n_edge_conf_t conf;
/* Status */
int *keep_running; /**< Pointer to edge loop stop/go flag */
struct peer_info *curr_sn; /**< Currently active supernode. */
uint8_t sn_wait; /**< Whether we are waiting for a supernode response. */
uint8_t sn_pong; /**< Whether we have seen a PONG since last time reset. */
@ -801,6 +802,7 @@ typedef struct n2n_tcp_connection {
typedef struct n2n_sn {
int *keep_running; /* Pointer to sn loop stop/go flag */
time_t start_time; /* Used to measure uptime. */
n2n_version_t version; /* version string sent to edges along with PEER_INFO a.k.a. PONG */
sn_stats_t stats;
@ -833,10 +835,4 @@ typedef struct n2n_sn {
/* *************************************************** */
typedef struct n2n_mgmt_handler {
char *cmd;
char *help;
void (*func)(n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv);
} n2n_mgmt_handler_t;
#endif /* _N2N_TYPEDEFS_H_ */

View File

@ -59,6 +59,7 @@ class JsonUDP():
# assert(data['cmd'] == cmd)
result = list()
error = None
while True:
data, _ = self.sock.recvfrom(1024)
@ -69,9 +70,13 @@ class JsonUDP():
continue
if data['_type'] == 'error':
raise ValueError('Error: {}'.format(data['error']))
# we still expect an end packet, so save the error
error = ValueError('Error: {}'.format(data['error']))
continue
if data['_type'] == 'end':
if error:
raise error
return result
if data['_type'] != 'row':
@ -104,10 +109,16 @@ def str_table(rows, columns):
"""Given an array of dicts, do a simple table print"""
result = list()
widths = collections.defaultdict(lambda: 0)
for row in rows:
if len(rows) == 0:
# No data to show, be sure not to truncate the column headings
for col in columns:
if col in row:
widths[col] = max(widths[col], len(str(row[col])))
widths[col] = len(col)
else:
for row in rows:
for col in columns:
if col in row:
widths[col] = max(widths[col], len(str(row[col])))
for col in columns:
if widths[col] == 0:
@ -128,7 +139,7 @@ def str_table(rows, columns):
def subcmd_show_supernodes(rpc, args):
rows = rpc.read('super')
rows = rpc.read('supernodes')
columns = [
'version',
'current',
@ -140,8 +151,8 @@ def subcmd_show_supernodes(rpc, args):
return str_table(rows, columns)
def subcmd_show_peers(rpc, args):
rows = rpc.read('peer')
def subcmd_show_edges(rpc, args):
rows = rpc.read('edges')
columns = [
'mode',
'ip4addr',
@ -160,9 +171,10 @@ def subcmd_show_help(rpc, args):
result += "\n"
result += "Possble remote commands:\n"
result += "(those without pretty-printer, will pass-through)\n\n"
result += "(those without a pretty-printer will pass-through)\n\n"
rows = rpc.read('help')
result += json.dumps(rows, sort_keys=True, indent=4)
for row in rows:
result += "{:12} {}\n".format(row['cmd'], row['help'])
return result
@ -175,9 +187,9 @@ subcmds = {
'func': subcmd_show_supernodes,
'help': 'Show the list of supernodes',
},
'peers': {
'func': subcmd_show_peers,
'help': 'Show the list of peers',
'edges': {
'func': subcmd_show_edges,
'help': 'Show the list of edges/peers',
},
}
@ -196,17 +208,20 @@ def main():
ap = argparse.ArgumentParser(
description='Query the running local n2n edge')
ap.add_argument('-t', '--mgmtport', action='store', default=5644,
help='Management Port (default=5644)')
help='Management Port (default=5644)', type=int)
ap.add_argument('-k', '--key', action='store',
help='Password for mgmt commands')
ap.add_argument('-d', '--debug', action='store_true',
help='Also show raw internal data')
ap.add_argument('--raw', action='store_true',
help='Force cmd to avoid any pretty printing')
group = ap.add_mutually_exclusive_group()
group.add_argument('--read', action='store_true',
help='Make a read request (default)')
group.add_argument('--write', action='store_true',
help='Make a write request')
help='Make a write request (only to non pretty'
'printed cmds)')
ap.add_argument('cmd', action='store',
help='Command to run (try "help" for list)')
@ -215,7 +230,7 @@ def main():
args = ap.parse_args()
if args.cmd not in subcmds:
if args.raw or (args.cmd not in subcmds):
func = subcmd_default
else:
func = subcmds[args.cmd]['func']

View File

@ -9,8 +9,8 @@
#
# Try it out with
# http://localhost:8080/
# http://localhost:8080/edge/peer
# http://localhost:8080/edge/super
# http://localhost:8080/edge/edges
# http://localhost:8080/edge/supernodes
import argparse
import socket
@ -19,6 +19,7 @@ import socketserver
import http.server
import signal
import functools
import base64
from http import HTTPStatus
@ -73,6 +74,7 @@ class JsonUDP():
# assert(data['cmd'] == cmd)
result = list()
error = None
while True:
data, _ = self.sock.recvfrom(1024)
@ -83,9 +85,13 @@ class JsonUDP():
continue
if data['_type'] == 'error':
raise ValueError('Error: {}'.format(data['error']))
# we still expect an end packet, so save the error
error = ValueError('Error: {}'.format(data['error']))
continue
if data['_type'] == 'end':
if error:
raise error
return result
if data['_type'] != 'row':
@ -114,22 +120,49 @@ class JsonUDP():
return self._call('w', cmdline)
indexhtml = """
<html>
<head>
<title>n2n management</title>
</head>
<body>
<div id="time"></div>
<br>
Supernodes:
<div id="super"></div>
<br>
Peers:
<div id="peer"></div>
pages = {
"/script.js": {
"content_type": "text/javascript",
"content": """
var verbose=-1;
<script>
function rows2table(id, columns, data) {
function rows2verbose(id, unused, data) {
row0 = data[0]
verbose = row0['traceLevel']
let div = document.getElementById(id);
div.innerHTML=verbose
}
function rows2keyvalue(id, keys, data) {
let s = "<table border=1 cellspacing=0>"
data.forEach((row) => {
keys.forEach((key) => {
if (key in row) {
s += "<tr><th>" + key + "<td>" + row[key];
}
});
});
s += "</table>"
let div = document.getElementById(id);
div.innerHTML=s
}
function rows2keyvalueall(id, unused, data) {
let s = "<table border=1 cellspacing=0>"
data.forEach((row) => {
Object.keys(row).forEach((key) => {
s += "<tr><th>" + key + "<td>" + row[key];
});
});
s += "</table>"
let div = document.getElementById(id);
div.innerHTML=s
}
function rows2table(id, columns, data) {
let s = "<table border=1 cellspacing=0>"
s += "<tr>"
columns.forEach((col) => {
@ -138,62 +171,227 @@ indexhtml = """
data.forEach((row) => {
s += "<tr>"
columns.forEach((col) => {
s += "<td>" + row[col]
val = row[col]
if (typeof val === "undefined") {
val = ''
}
s += "<td>" + val
});
});
s += "</table>"
let div = document.getElementById(id);
div.innerHTML=s
}
}
function fetch_table(url, id, columns) {
function do_get(url, id, handler, handler_param) {
fetch(url)
.then(function (response) {
return response.json();
})
.then(function (data) {
rows2table(id,columns,data);
handler(id,handler_param,data);
// update the timestamp on success
let now = Math.round(new Date().getTime() / 1000);
let time = document.getElementById('time');
time.innerHTML=now;
})
.catch(function (err) {
console.log('error: ' + err);
});
}
}
function refresh_job() {
let now = new Date().getTime();
function do_post(url, body, id, handler, handler_param) {
fetch(url, {method:'POST', body: body})
.then(function (response) {
return response.json();
})
.then(function (data) {
handler(id,handler_param,data);
})
.catch(function (err) {
console.log('error: ' + err);
});
}
let time = document.getElementById('time');
time.innerHTML="last updated: " + now;
function do_stop(tracelevel) {
// FIXME: uses global in script library
fetch(nodetype + '/stop', {method:'POST'})
}
fetch_table(
'edge/super',
'super',
['version','current','macaddr','sockaddr','uptime']
function setverbose(tracelevel) {
if (tracelevel < 0) {
tracelevel = 0;
}
// FIXME: uses global in script library
do_post(
nodetype + '/verbose', tracelevel, 'verbose',
rows2verbose, null
);
fetch_table(
'edge/peer',
'peer',
['mode','ip4addr','macaddr','sockaddr','desc']
);
}
}
function refresh_setup(interval) {
function refresh_setup(interval) {
var timer = setInterval(refresh_job, interval);
}
}
""",
},
"/": {
"content_type": "text/html; charset=utf-8",
"content": """
<html>
<head>
<title>n2n edge management</title>
</head>
<body>
<table>
<tr>
<td>Last Updated:
<td><div id="time"></div>
<td><button onclick=refresh_job()>update</button>
<td><button onclick=do_stop()>stop edge</button>
<tr>
<td>Logging Verbosity:
<td>
<div id="verbose"></div>
<td>
<button onclick=setverbose(verbose+1)>+</button>
<button onclick=setverbose(verbose-1)>-</button>
</table>
<br>
<div id="communities"></div>
<br>
Edges/Peers:
<div id="edges"></div>
<br>
Supernodes:
<div id="supernodes"></div>
<br>
<div id="timestamps"></div>
<br>
<div id="packetstats"></div>
refresh_setup(5000);
<script src="script.js"></script>
<script>
// FIXME: hacky global
var nodetype="edge";
function refresh_job() {
do_get(
nodetype + '/verbose', 'verbose',
rows2verbose, null
);
do_get(
nodetype + '/communities', 'communities',
rows2keyvalue, ['community']
);
do_get(
nodetype + '/supernodes', 'supernodes',
rows2table, ['version','current','macaddr','sockaddr','uptime']
);
do_get(
nodetype + '/edges', 'edges',
rows2table, ['mode','ip4addr','macaddr','sockaddr','desc']
);
do_get(
nodetype + '/timestamps', 'timestamps',
rows2keyvalueall, null
);
do_get(
nodetype + '/packetstats', 'packetstats',
rows2table, ['type','tx_pkt','rx_pkt']
);
}
refresh_setup(10000);
refresh_job();
</script>
</body>
</html>
"""
""",
},
"/supernode.html": {
"content_type": "text/html; charset=utf-8",
"content": """
<html>
<head>
<title>n2n supernode management</title>
</head>
<body>
<table>
<tr>
<td>Last Updated:
<td><div id="time"></div>
<td><button onclick=refresh_job()>update</button>
<td><button onclick=do_stop()>stop supernode</button>
<tr>
<td>Logging Verbosity:
<td>
<div id="verbose"></div>
<td>
<button onclick=setverbose(verbose+1)>+</button>
<button onclick=setverbose(verbose-1)>-</button>
<td><button onclick=do_reload()>reload communities</button>
</table>
<br>
Communities:
<div id="communities"></div>
<br>
Edges/Peers:
<div id="edges"></div>
<br>
<div id="timestamps"></div>
<br>
<div id="packetstats"></div>
<script src="script.js"></script>
<script>
// FIXME: hacky global
var nodetype="supernode";
function do_reload() {
fetch(nodetype + '/reload_communities', {method:'POST'})
}
function refresh_job() {
do_get(
nodetype + '/verbose', 'verbose',
rows2verbose, null
);
do_get(
nodetype + '/communities', 'communities',
rows2table, ['community','ip4addr','is_federation','purgeable']
);
do_get(
nodetype + '/edges', 'edges',
rows2table,
['community','ip4addr','macaddr','sockaddr','proto','desc']
);
do_get(
nodetype + '/timestamps', 'timestamps',
rows2keyvalueall, null
);
do_get(
nodetype + '/packetstats', 'packetstats',
rows2table, ['type','tx_pkt','rx_pkt']
);
}
refresh_setup(10000);
refresh_job();
</script>
</body>
</html>
""",
},
}
class SimpleHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, rpc, *args, **kwargs):
def __init__(self, rpc, snrpc, *args, **kwargs):
self.rpc = rpc
self.snrpc = snrpc
super().__init__(*args, **kwargs)
def log_request(self, code='-', size='-'):
@ -205,42 +403,102 @@ class SimpleHandler(http.server.BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(message.encode('utf8'))
def do_GET(self):
url_tail = self.path
def _replyjson(self, data):
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode('utf8'))
if url_tail == "/":
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(indexhtml.encode('utf8'))
return
def _replyunauth(self):
self.send_response(HTTPStatus.UNAUTHORIZED)
self.send_header('WWW-Authenticate', 'Basic realm="n2n"')
self.end_headers()
if url_tail.startswith("/edge/"):
tail = url_tail.split('/')
cmd = tail[2]
# if commands ever need args, use more of the path components
def _extractauth(self, rpc):
# Avoid caching the key inside the object for all clients
rpc.key = None
try:
data = self.rpc.read(cmd)
except ValueError:
self._simplereply(HTTPStatus.BAD_REQUEST, 'Bad Command')
header = self.headers.get('Authorization')
if header is not None:
authtype, encoded = header.split(' ')
if authtype == 'Basic':
user, key = base64.b64decode(encoded).decode('utf8').split(':')
rpc.key = key
if rpc.key is None:
rpc.key = rpc.defaultkey
def _rpc(self, method, cmdline):
try:
data = method(cmdline)
except ValueError as e:
if str(e) == "Error: badauth":
self._replyunauth()
return
self._simplereply(HTTPStatus.BAD_REQUEST, 'Bad Command')
return
self._replyjson(data)
return
def _rpc_read(self, rpc):
self._extractauth(rpc)
tail = self.path.split('/')
cmd = tail[2]
# if reads ever need args, could use more of the tail
self._rpc(rpc.read, cmd)
def _rpc_write(self, rpc):
self._extractauth(rpc)
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf8')
tail = self.path.split('/')
cmd = tail[2]
cmdline = cmd + ' ' + post_data
self._rpc(rpc.write, cmdline)
def do_GET(self):
if self.path.startswith("/edge/"):
self._rpc_read(self.rpc)
return
if self.path.startswith("/supernode/"):
self._rpc_read(self.snrpc)
return
if self.path in pages:
page = pages[self.path]
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.send_header('Content-type', page['content_type'])
self.end_headers()
self.wfile.write(json.dumps(data).encode('utf8'))
self.wfile.write(page['content'].encode('utf8'))
return
self._simplereply(HTTPStatus.NOT_FOUND, 'Not Found')
return
def do_POST(self):
if self.path.startswith("/edge/"):
self._rpc_write(self.rpc)
return
if self.path.startswith("/supernode/"):
self._rpc_write(self.snrpc)
return
def main():
ap = argparse.ArgumentParser(
description='Control the running local n2n edge via http')
ap.add_argument('-t', '--mgmtport', action='store', default=5644,
help='Management Port (default=5644)')
help='Management Port (default=5644)', type=int)
ap.add_argument('--snmgmtport', action='store', default=5645,
help='Supernode Management Port (default=5645)', type=int)
ap.add_argument('-k', '--key', action='store',
help='Password for mgmt commands')
ap.add_argument('-d', '--debug', action='store_true',
@ -253,12 +511,16 @@ def main():
rpc = JsonUDP(args.mgmtport)
rpc.debug = args.debug
rpc.key = args.key
rpc.defaultkey = args.key
snrpc = JsonUDP(args.snmgmtport)
snrpc.debug = args.debug
snrpc.defaultkey = args.key
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
socketserver.TCPServer.allow_reuse_address = True
handler = functools.partial(SimpleHandler, rpc)
handler = functools.partial(SimpleHandler, rpc, snrpc)
with socketserver.TCPServer(("", args.port), handler) as httpd:
try:
httpd.serve_forever()

View File

@ -1300,8 +1300,9 @@ int main (int argc, char* argv[]) {
#endif
keep_on_running = 1;
eee->keep_running = &keep_on_running;
traceEvent(TRACE_NORMAL, "edge started");
rc = run_edge_loop(eee, &keep_on_running);
rc = run_edge_loop(eee);
print_edge_stats(eee);
#ifdef HAVE_LIBCAP

449
src/edge_management.c Normal file
View File

@ -0,0 +1,449 @@
/**
* (C) 2007-21 - ntop.org and contributors
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not see see <http://www.gnu.org/licenses/>
*
*/
#include "n2n.h"
#include "edge_utils_win32.h"
#define FLAG_WROK 1
typedef struct n2n_mgmt_handler {
int flags;
char *cmd;
char *help;
void (*func)(n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv);
} n2n_mgmt_handler_t;
static void mgmt_error (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, char *tag, char *msg) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"error\","
"\"error\":\"%s\"}\n",
tag,
msg);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_stop (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
if(type==N2N_MGMT_WRITE) {
*eee->keep_running = 0;
}
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"keep_running\":%u}\n",
tag,
*eee->keep_running);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_verbose (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
if(type==N2N_MGMT_WRITE) {
if(argv) {
setTraceLevel(strtoul(argv, NULL, 0));
}
}
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"traceLevel\":%u}\n",
tag,
getTraceLevel());
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_communities (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
if(eee->conf.header_encryption != HEADER_ENCRYPTION_NONE) {
mgmt_error(eee, udp_buf, sender_sock, tag, "noaccess");
return;
}
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"community\":\"%s\"}",
tag,
eee->conf.community_name);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_supernodes (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
struct peer_info *peer, *tmpPeer;
macstr_t mac_buf;
n2n_sock_str_t sockbuf;
selection_criterion_str_t sel_buf;
HASH_ITER(hh, eee->conf.supernodes, peer, tmpPeer) {
/*
* TODO:
* The version string provided by the remote supernode could contain
* chars that make our JSON invalid.
* - do we care?
*/
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"version\":\"%s\","
"\"purgeable\":%i,"
"\"current\":%i,"
"\"macaddr\":\"%s\","
"\"sockaddr\":\"%s\","
"\"selection\":\"%s\","
"\"lastseen\":%li,"
"\"uptime\":%li}\n",
tag,
peer->version,
peer->purgeable,
(peer == eee->curr_sn) ? (eee->sn_wait ? 2 : 1 ) : 0,
is_null_mac(peer->mac_addr) ? "" : macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
sn_selection_criterion_str(sel_buf, peer),
peer->last_seen,
peer->uptime);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
static void mgmt_edges_row (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, char *tag, struct peer_info *peer, char *mode) {
size_t msg_len;
macstr_t mac_buf;
n2n_sock_str_t sockbuf;
dec_ip_bit_str_t ip_bit_str = {'\0'};
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"mode\":\"%s\","
"\"ip4addr\":\"%s\","
"\"purgeable\":%i,"
"\"macaddr\":\"%s\","
"\"sockaddr\":\"%s\","
"\"desc\":\"%s\","
"\"last_seen\":%li}\n",
tag,
mode,
(peer->dev_addr.net_addr == 0) ? "" : ip_subnet_to_str(ip_bit_str, &peer->dev_addr),
peer->purgeable,
(is_null_mac(peer->mac_addr)) ? "" : macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
peer->dev_desc,
peer->last_seen);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0 /*flags*/,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_edges (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
struct peer_info *peer, *tmpPeer;
// dump nodes with forwarding through supernodes
HASH_ITER(hh, eee->pending_peers, peer, tmpPeer) {
mgmt_edges_row(eee, udp_buf, sender_sock, tag, peer, "pSp");
}
// dump peer-to-peer nodes
HASH_ITER(hh, eee->known_peers, peer, tmpPeer) {
mgmt_edges_row(eee, udp_buf, sender_sock, tag, peer, "p2p");
}
}
static void mgmt_timestamps (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"start_time\":%lu,"
"\"last_super\":%ld,"
"\"last_p2p\":%ld}\n",
tag,
eee->start_time,
eee->last_sup,
eee->last_p2p);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_packetstats (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"transop\","
"\"tx_pkt\":%lu,"
"\"rx_pkt\":%lu}\n",
tag,
eee->transop.tx_cnt,
eee->transop.rx_cnt);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"p2p\","
"\"tx_pkt\":%u,"
"\"rx_pkt\":%u}\n",
tag,
eee->stats.tx_p2p,
eee->stats.rx_p2p);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"super\","
"\"tx_pkt\":%u,"
"\"rx_pkt\":%u}\n",
tag,
eee->stats.tx_sup,
eee->stats.rx_sup);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"super_broadcast\","
"\"tx_pkt\":%u,"
"\"rx_pkt\":%u}\n",
tag,
eee->stats.tx_sup_broadcast,
eee->stats.rx_sup_broadcast);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_unimplemented (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
mgmt_error(eee, udp_buf, sender_sock, tag, "unimplemented");
}
static void mgmt_help (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv);
n2n_mgmt_handler_t mgmt_handlers[] = {
{ .cmd = "reload_communities", .flags = FLAG_WROK, .help = "Reserved for supernode", .func = mgmt_unimplemented},
{ .cmd = "stop", .flags = FLAG_WROK, .help = "Gracefully exit edge", .func = mgmt_stop},
{ .cmd = "verbose", .flags = FLAG_WROK, .help = "Manage verbosity level", .func = mgmt_verbose},
{ .cmd = "communities", .help = "Show current community", .func = mgmt_communities},
{ .cmd = "edges", .help = "List current edges/peers", .func = mgmt_edges},
{ .cmd = "supernodes", .help = "List current supernodes", .func = mgmt_supernodes},
{ .cmd = "timestamps", .help = "Event timestamps", .func = mgmt_timestamps},
{ .cmd = "packetstats", .help = "traffic counters", .func = mgmt_packetstats},
{ .cmd = "help", .flags = FLAG_WROK, .help = "Show JSON commands", .func = mgmt_help},
{ .cmd = NULL },
};
static void mgmt_help (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
n2n_mgmt_handler_t *handler;
/*
* Even though this command is readonly, we deliberately do not check
* the type - allowing help replies to both read and write requests
*/
for( handler=mgmt_handlers; handler->cmd; handler++ ) {
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"cmd\":\"%s\","
"\"help\":\"%s\"}\n",
tag,
handler->cmd,
handler->help);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
/*
* Check if the user is authorised for this command.
* - this should be more configurable!
* - for the moment we use some simple heuristics:
* Reads are not dangerous, so they are simply allowed
* Writes are possibly dangerous, so they need a fake password
*/
static int mgmt_auth (const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *auth, char *argv0, char *argv) {
if(auth) {
/* If we have an auth key, it must match */
if(0 == strcmp(auth,"CHANGEME")) {
return 1;
}
return 0;
}
/* if we dont have an auth key, we can still read */
if(type==N2N_MGMT_READ) {
return 1;
}
return 0;
}
void handleMgmtJson (n2n_edge_t *eee, char *udp_buf, const struct sockaddr_in sender_sock) {
char cmdlinebuf[80];
enum n2n_mgmt_type type;
char *typechar;
char *options;
char *argv0;
char *argv;
char *tag;
char *flagstr;
int flags;
char *auth;
n2n_mgmt_handler_t *handler;
size_t msg_len;
/* save a copy of the commandline before we reuse the udp_buf */
strncpy(cmdlinebuf, udp_buf, sizeof(cmdlinebuf)-1);
cmdlinebuf[sizeof(cmdlinebuf)-1] = 0;
traceEvent(TRACE_DEBUG, "mgmt json %s", cmdlinebuf);
typechar = strtok(cmdlinebuf, " \r\n");
if(!typechar) {
/* should not happen */
mgmt_error(eee, udp_buf, sender_sock, "-1", "notype");
return;
}
if(*typechar == 'r') {
type=N2N_MGMT_READ;
} else if(*typechar == 'w') {
type=N2N_MGMT_WRITE;
} else {
/* dunno how we got here */
mgmt_error(eee, udp_buf, sender_sock, "-1", "badtype");
return;
}
/* Extract the tag to use in all reply packets */
options = strtok(NULL, " \r\n");
if(!options) {
mgmt_error(eee, udp_buf, sender_sock, "-1", "nooptions");
return;
}
argv0 = strtok(NULL, " \r\n");
if(!argv0) {
mgmt_error(eee, udp_buf, sender_sock, "-1", "nocmd");
return;
}
/*
* The entire rest of the line is the argv. We apply no processing
* or arg separation so that the cmd can use it however it needs.
*/
argv = strtok(NULL, "\r\n");
/*
* There might be an auth token mixed in with the tag
*/
tag = strtok(options, ":");
flagstr = strtok(NULL, ":");
if(flagstr) {
flags = strtoul(flagstr, NULL, 16);
} else {
flags = 0;
}
/* Only 1 flag bit defined at the moment - "auth option present" */
if(flags & 1) {
auth = strtok(NULL, ":");
} else {
auth = NULL;
}
if(!mgmt_auth(sender_sock, type, auth, argv0, argv)) {
mgmt_error(eee, udp_buf, sender_sock, tag, "badauth");
return;
}
for( handler=mgmt_handlers; handler->cmd; handler++ ) {
if(0 == strcmp(handler->cmd, argv0)) {
break;
}
}
if(!handler->cmd) {
mgmt_error(eee, udp_buf, sender_sock, tag, "unknowncmd");
return;
}
if((type==N2N_MGMT_WRITE) && !(handler->flags & FLAG_WROK)) {
mgmt_error(eee, udp_buf, sender_sock, tag, "readonly");
return;
}
/*
* TODO:
* The tag provided by the requester could contain chars
* that make our JSON invalid.
* - do we care?
*/
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{\"_tag\":\"%s\",\"_type\":\"begin\",\"cmd\":\"%s\"}\n", tag, argv0);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
handler->func(eee, udp_buf, sender_sock, type, tag, argv0, argv);
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{\"_tag\":\"%s\",\"_type\":\"end\"}\n", tag);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
return;
}

View File

@ -22,7 +22,7 @@
/* heap allocation for compression as per lzo example doc */
#define HEAP_ALLOC(var,size) lzo_align_t __LZO_MMODEL var [ ((size) + (sizeof(lzo_align_t) - 1)) / sizeof(lzo_align_t) ]
static HEAP_ALLOC (wrkmem, LZO1X_1_MEM_COMPRESS);
static HEAP_ALLOC(wrkmem, LZO1X_1_MEM_COMPRESS);
/* ************************************** */
@ -247,7 +247,7 @@ static int detect_local_ip_address (n2n_sock_t* out_sock, const n2n_edge_t* eee)
// open socket, close it before if TCP
// in case of TCP, 'connect()' is required
int supernode_connect(n2n_edge_t *eee) {
int supernode_connect (n2n_edge_t *eee) {
int sockopt;
struct sockaddr_in sn_sock;
@ -334,7 +334,7 @@ int supernode_connect(n2n_edge_t *eee) {
// always closes the socket
void supernode_disconnect(n2n_edge_t *eee) {
void supernode_disconnect (n2n_edge_t *eee) {
if(eee->sock >= 0) {
closesocket(eee->sock);
@ -480,7 +480,7 @@ n2n_edge_t* edge_init (const n2n_edge_conf_t *conf, int *rv) {
}
if(resolve_create_thread(&(eee->resolve_parameter), eee->conf.supernodes) == 0) {
traceEvent(TRACE_NORMAL, "successfully created resolver thread");
traceEvent(TRACE_NORMAL, "successfully created resolver thread");
}
eee->network_traffic_filter = create_network_traffic_filter();
@ -490,7 +490,7 @@ n2n_edge_t* edge_init (const n2n_edge_conf_t *conf, int *rv) {
*rv = 0;
return(eee);
edge_init_error:
edge_init_error:
if(eee)
free(eee);
*rv = rc;
@ -859,7 +859,7 @@ static int get_local_auth (n2n_edge_t *eee, n2n_auth_t *auth) {
speck_128_encrypt(auth->token + N2N_PRIVATE_PUBLIC_KEY_SIZE, (speck_context_t*)eee->conf.shared_secret_ctx);
break;
default:
break;
break;
}
return 0;
@ -992,7 +992,7 @@ static void check_known_peer_sock_change (n2n_edge_t *eee,
/** Send a datagram to a socket file descriptor */
static ssize_t sendto_fd (n2n_edge_t *eee, const void *buf,
size_t len, struct sockaddr_in *dest) {
size_t len, struct sockaddr_in *dest) {
ssize_t sent = 0;
int rc = 1;
@ -1010,7 +1010,7 @@ static ssize_t sendto_fd (n2n_edge_t *eee, const void *buf,
rc = select(eee->sock + 1, NULL, &socket_mask, NULL, &wait_time);
}
if (rc > 0) {
if(rc > 0) {
sent = sendto(eee->sock, buf, len, 0 /*flags*/,
(struct sockaddr *)dest, sizeof(struct sockaddr_in));
@ -1175,7 +1175,7 @@ void send_query_peer (n2n_edge_t * eee,
if(eee->conf.header_encryption == HEADER_ENCRYPTION_ENABLED) {
packet_header_encrypt(pktbuf, idx, idx,
eee->conf.header_encryption_ctx_dynamic, eee->conf.header_iv_ctx_dynamic,
time_stamp ());
time_stamp());
}
sendto_sock(eee, pktbuf, idx, &(eee->curr_sn->sock));
@ -1186,7 +1186,7 @@ void send_query_peer (n2n_edge_t * eee,
if(eee->conf.header_encryption == HEADER_ENCRYPTION_ENABLED) {
packet_header_encrypt(pktbuf, idx, idx,
eee->conf.header_encryption_ctx_dynamic, eee->conf.header_iv_ctx_dynamic,
time_stamp ());
time_stamp());
}
n_o_pings = eee->conf.number_max_sn_pings;
@ -1525,7 +1525,7 @@ void update_supernode_reg (n2n_edge_t * eee, time_t now) {
// determine time offset to apply on last_register_req for
// all edges's next re-registration does not happen all at once
if (eee->sn_wait == 2) {
if(eee->sn_wait == 2) {
// remaining 1/4 is greater than 1/10 fast retry allowance;
// '%' might be expensive but does not happen all too often
off = n2n_rand() % ((eee->conf.register_interval * 3) / 4);
@ -1697,7 +1697,7 @@ static int handle_PACKET (n2n_edge_t * eee,
if(rx_compression_id != N2N_COMPRESSION_ID_NONE) {
traceEvent(TRACE_DEBUG, "payload decompression %s: deflated %u bytes to %u bytes",
compression_str(rx_compression_id), eth_size, (int)deflated_len);
memcpy(eth_payload ,deflation_buffer, deflated_len );
memcpy(eth_payload,deflation_buffer, deflated_len );
eth_size = deflated_len;
free(deflation_buffer);
}
@ -1806,299 +1806,9 @@ static char *get_ip_from_arp (dec_ip_str_t buf, const n2n_mac_t req_mac) {
#endif
#endif
static void handleMgmtJson_error (n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, char *tag, char *msg) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"error\","
"\"error\":\"%s\"}\n",
tag,
msg);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void handleMgmtJson_super (n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
struct peer_info *peer, *tmpPeer;
macstr_t mac_buf;
n2n_sock_str_t sockbuf;
selection_criterion_str_t sel_buf;
if(type!=N2N_MGMT_READ) {
handleMgmtJson_error(eee, udp_buf, sender_sock, tag, "readonly");
return;
}
HASH_ITER(hh, eee->conf.supernodes, peer, tmpPeer) {
/*
* TODO:
* The version string provided by the remote supernode could contain
* chars that make our JSON invalid.
* - do we care?
*/
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"version\":\"%s\","
"\"purgeable\":%i,"
"\"current\":%i,"
"\"macaddr\":\"%s\","
"\"sockaddr\":\"%s\","
"\"selection\":\"%s\","
"\"lastseen\":%li,"
"\"uptime\":%li}\n",
tag,
peer->version,
peer->purgeable,
(peer == eee->curr_sn) ? (eee->sn_wait ? 2 : 1 ) : 0,
is_null_mac(peer->mac_addr) ? "" : macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
sn_selection_criterion_str(sel_buf, peer),
peer->last_seen,
peer->uptime);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
static void handleMgmtJson_peer (n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
struct peer_info *peer, *tmpPeer;
macstr_t mac_buf;
n2n_sock_str_t sockbuf;
in_addr_t net;
if(type!=N2N_MGMT_READ) {
handleMgmtJson_error(eee, udp_buf, sender_sock, tag, "readonly");
return;
}
/* FIXME:
* dont repeat yourself - the body of these two loops is identical
*/
// dump nodes with forwarding through supernodes
HASH_ITER(hh, eee->pending_peers, peer, tmpPeer) {
net = htonl(peer->dev_addr.net_addr);
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"mode\":\"pSp\","
"\"ip4addr\":\"%s\","
"\"macaddr\":\"%s\","
"\"sockaddr\":\"%s\","
"\"desc\":\"%s\","
"\"lastseen\":%li}\n",
tag,
(peer->dev_addr.net_addr == 0) ? "" : inet_ntoa(*(struct in_addr *) &net),
(is_null_mac(peer->mac_addr)) ? "" : macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
peer->dev_desc,
peer->last_seen);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0/*flags*/,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
// dump peer-to-peer nodes
HASH_ITER(hh, eee->known_peers, peer, tmpPeer) {
net = htonl(peer->dev_addr.net_addr);
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"mode\":\"p2p\","
"\"ip4addr\":\"%s\","
"\"macaddr\":\"%s\","
"\"sockaddr\":\"%s\","
"\"desc\":\"%s\","
"\"lastseen\":%li}\n",
tag,
(peer->dev_addr.net_addr == 0) ? "" : inet_ntoa(*(struct in_addr *) &net),
(is_null_mac(peer->mac_addr)) ? "" : macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
peer->dev_desc,
peer->last_seen);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0/*flags*/,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
static void handleMgmtJson_help (n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv);
n2n_mgmt_handler_t mgmt_handlers[] = {
{ .cmd = "peer", .help = "List current peers", .func = handleMgmtJson_peer},
{ .cmd = "super", .help = "List current supernodes", .func = handleMgmtJson_super},
{ .cmd = "help", .help = "Show JSON commands", .func = handleMgmtJson_help},
{ .cmd = NULL },
};
static void handleMgmtJson_help (n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
n2n_mgmt_handler_t *handler;
/*
* Even though this command is readonly, we deliberately do not check
* the type - allowing help replys to both read and write requests
*/
for( handler=mgmt_handlers; handler->cmd; handler++ ) {
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"cmd\":\"%s\","
"\"help\":\"%s\"}\n",
tag,
handler->cmd,
handler->help);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
/*
* Check if the user is authorised for this command.
* - this should be more configurable!
* - for the moment we use some simple heuristics:
* Reads are not dangerous, so they are simply allowed
* Writes are possibly dangerous, so they need a fake password
*/
int handleMgmtJson_auth(struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *auth, char *argv0, char *argv) {
if(auth) {
/* If we have an auth key, it must match */
if(0 == strcmp(auth,"CHANGEME")) {
return 1;
}
return 0;
}
/* if we dont have an auth key, we can still read */
if(type==N2N_MGMT_READ) {
return 1;
}
return 0;
}
static void handleMgmtJson (n2n_edge_t *eee, char *udp_buf, struct sockaddr_in sender_sock) {
char cmdlinebuf[80];
enum n2n_mgmt_type type;
char *typechar;
char *options;
char *argv0;
char *argv;
char *tag;
char *flagstr;
int flags;
char *auth;
n2n_mgmt_handler_t *handler;
size_t msg_len;
/* save a copy of the commandline before we reuse the udp_buf */
strncpy(cmdlinebuf, udp_buf, sizeof(cmdlinebuf)-1);
cmdlinebuf[sizeof(cmdlinebuf)-1] = 0;
traceEvent(TRACE_DEBUG, "mgmt json %s", cmdlinebuf);
typechar = strtok(cmdlinebuf, " \r\n");
if(!typechar) {
/* should not happen */
handleMgmtJson_error(eee, udp_buf, sender_sock, "-1", "notype");
return;
}
if(*typechar == 'r') {
type=N2N_MGMT_READ;
} else if(*typechar == 'w') {
type=N2N_MGMT_WRITE;
} else {
/* dunno how we got here */
handleMgmtJson_error(eee, udp_buf, sender_sock, "-1", "badtype");
return;
}
/* Extract the tag to use in all reply packets */
options = strtok(NULL, " \r\n");
if(!options) {
handleMgmtJson_error(eee, udp_buf, sender_sock, "-1", "nooptions");
return;
}
argv0 = strtok(NULL, " \r\n");
if(!argv0) {
handleMgmtJson_error(eee, udp_buf, sender_sock, "-1", "nocmd");
return;
}
/*
* The entire rest of the line is the argv. We apply no processing
* or arg separation so that the cmd can use it however it needs.
*/
argv = strtok(NULL, "\r\n");
/*
* There might be an auth token mixed in with the tag
*/
tag = strtok(options, ":");
flagstr = strtok(NULL, ":");
if (flagstr) {
flags = strtoul(flagstr, NULL, 16);
} else {
flags = 0;
}
/* Only 1 flag bit defined at the moment - "auth option present" */
if (flags & 1) {
auth = strtok(NULL, ":");
} else {
auth = NULL;
}
if(!handleMgmtJson_auth(sender_sock, type, auth, argv0, argv)) {
handleMgmtJson_error(eee, udp_buf, sender_sock, tag, "badauth");
return;
}
for( handler=mgmt_handlers; handler->cmd; handler++ ) {
if(0 == strcmp(handler->cmd, argv0)) {
break;
}
}
if(!handler->cmd) {
handleMgmtJson_error(eee, udp_buf, sender_sock, tag, "unknowncmd");
return;
}
/*
* TODO:
* The tag provided by the requester could contain chars
* that make our JSON invalid.
* - do we care?
*/
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{\"_tag\":\"%s\",\"_type\":\"begin\",\"cmd\":\"%s\"}\n", tag, argv0);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
handler->func(eee, udp_buf, sender_sock, type, tag, argv0, argv);
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{\"_tag\":\"%s\",\"_type\":\"end\"}\n", tag);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
return;
}
/** Read a datagram from the management UDP socket and take appropriate
* action. */
static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
static void readFromMgmtSocket (n2n_edge_t *eee) {
char udp_buf[N2N_PKT_BUF_SIZE]; /* Compete UDP packet */
ssize_t recvlen;
@ -2155,7 +1865,7 @@ static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
if(0 == memcmp(udp_buf, "stop", 4)) {
traceEvent(TRACE_NORMAL, "stop command received");
*keep_running = 0;
*eee->keep_running = 0;
return;
}
@ -2194,7 +1904,7 @@ static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
if((udp_buf[0] == 'r' || udp_buf[0] == 'w') && (udp_buf[1] == ' ')) {
/* this is a JSON request */
handleMgmtJson(eee, (char *)udp_buf, sender_sock);
handleMgmtJson(eee, udp_buf, sender_sock);
return;
}
@ -2216,7 +1926,7 @@ static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
HASH_ITER(hh, eee->pending_peers, peer, tmpPeer) {
++num_pending_peers;
net = htonl(peer->dev_addr.net_addr);
snprintf (time_buf, sizeof(time_buf), "%9u", (unsigned int)(now - peer->last_seen));
snprintf(time_buf, sizeof(time_buf), "%9u", (unsigned int)(now - peer->last_seen));
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"%4u | %-15s | %-17s | %-21s | %-15s | %9s |\n",
++num,
@ -2240,7 +1950,7 @@ static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
HASH_ITER(hh, eee->known_peers, peer, tmpPeer) {
++num_known_peers;
net = htonl(peer->dev_addr.net_addr);
snprintf (time_buf, sizeof(time_buf), "%9u", (unsigned int)(now - peer->last_seen));
snprintf(time_buf, sizeof(time_buf), "%9u", (unsigned int)(now - peer->last_seen));
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"%4u | %-15s | %-17s | %-21s | %-15s | %9s |\n",
++num,
@ -2263,8 +1973,8 @@ static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
"SUPERNODES\n");
HASH_ITER(hh, eee->conf.supernodes, peer, tmpPeer) {
net = htonl(peer->dev_addr.net_addr);
snprintf (time_buf, sizeof(time_buf), "%9u", (unsigned int)(now - peer->last_seen));
snprintf (uptime_buf, sizeof(uptime_buf), "%10u", (unsigned int)(peer->uptime));
snprintf(time_buf, sizeof(time_buf), "%9u", (unsigned int)(now - peer->last_seen));
snprintf(uptime_buf, sizeof(uptime_buf), "%10u", (unsigned int)(peer->uptime));
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"%-19s %1s%1s | %-17s | %-21s | %-15s | %9s | %10s\n",
peer->version,
@ -2293,9 +2003,9 @@ static void readFromMgmtSocket (n2n_edge_t *eee, int *keep_running) {
"pend_peers %u | ",
num_pending_peers);
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"known_peers %u | ",
num_known_peers);
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"known_peers %u | ",
num_known_peers);
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"transop %u,%u\n",
@ -2519,9 +2229,9 @@ void edge_send_packet2net (n2n_edge_t * eee,
uint8_t * compression_buffer = NULL;
int32_t compression_len;
switch (eee->conf.compression) {
switch(eee->conf.compression) {
case N2N_COMPRESSION_ID_LZO:
compression_buffer = malloc (len + len / 16 + 64 + 3);
compression_buffer = malloc(len + len / 16 + 64 + 3);
if(lzo1x_1_compress(tap_pkt, len, compression_buffer, (lzo_uint*)&compression_len, wrkmem) == LZO_E_OK) {
if(compression_len < len) {
pkt.compression = N2N_COMPRESSION_ID_LZO;
@ -2531,7 +2241,7 @@ void edge_send_packet2net (n2n_edge_t * eee,
#ifdef N2N_HAVE_ZSTD
case N2N_COMPRESSION_ID_ZSTD:
compression_len = N2N_PKT_BUF_SIZE + 128;
compression_buffer = malloc (compression_len); // leaves enough room, for exact size call compression_len = ZSTD_compressBound (len); (slower)
compression_buffer = malloc(compression_len); // leaves enough room, for exact size call compression_len = ZSTD_compressBound (len); (slower)
compression_len = (int32_t)ZSTD_compress(compression_buffer, compression_len, tap_pkt, len, ZSTD_COMPRESSION_LEVEL);
if(!ZSTD_isError(compression_len)) {
if(compression_len < len) {
@ -2767,7 +2477,7 @@ void process_udp (n2n_edge_t *eee, const struct sockaddr_in *sender_sock, const
from_supernode = cmn.flags & N2N_FLAGS_FROM_SUPERNODE;
if(from_supernode) {
skip_add = SN_ADD_SKIP;
sn = add_sn_to_list_by_mac_or_sock (&(eee->conf.supernodes), &sender, null_mac, &skip_add);
sn = add_sn_to_list_by_mac_or_sock(&(eee->conf.supernodes), &sender, null_mac, &skip_add);
if(!sn) {
traceEvent(TRACE_DEBUG, "dropped incoming data from unknown supernode");
return;
@ -3300,7 +3010,7 @@ void print_edge_stats (const n2n_edge_t *eee) {
/* ************************************** */
int run_edge_loop (n2n_edge_t *eee, int *keep_running) {
int run_edge_loop (n2n_edge_t *eee) {
size_t numPurged;
time_t lastIfaceCheck = 0;
@ -3315,11 +3025,10 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) {
#ifdef WIN32
struct tunread_arg arg;
arg.eee = eee;
arg.keep_running = keep_running;
HANDLE tun_read_thread = startTunReadThread(&arg);
#endif
*keep_running = 1;
*eee->keep_running = 1;
update_supernode_reg(eee, time(NULL));
/* Main loop
@ -3329,7 +3038,7 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) {
* readFromIPSocket() or edge_read_from_tap()
*/
while(*keep_running) {
while(*eee->keep_running) {
int rc, max_sock = 0;
fd_set socket_mask;
@ -3375,10 +3084,10 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) {
// external
if(FD_ISSET(eee->sock, &socket_mask)) {
if (0 != fetch_and_eventually_process_data(eee, eee->sock,
pktbuf, &expected, &position,
now)) {
*keep_running = 0;
if(0 != fetch_and_eventually_process_data(eee, eee->sock,
pktbuf, &expected, &position,
now)) {
*eee->keep_running = 0;
break;
}
if(eee->conf.connect_tcp) {
@ -3396,10 +3105,10 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) {
#ifndef SKIP_MULTICAST_PEERS_DISCOVERY
if(FD_ISSET(eee->udp_multicast_sock, &socket_mask)) {
if (0 != fetch_and_eventually_process_data (eee, eee->udp_multicast_sock,
pktbuf, &expected, &position,
now)) {
*keep_running = 0;
if(0 != fetch_and_eventually_process_data(eee, eee->udp_multicast_sock,
pktbuf, &expected, &position,
now)) {
*eee->keep_running = 0;
break;
}
}
@ -3407,9 +3116,9 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) {
if(FD_ISSET(eee->udp_mgmt_sock, &socket_mask)) {
// read from the management port socket
readFromMgmtSocket(eee, keep_running);
readFromMgmtSocket(eee);
if(!(*keep_running))
if(!(*eee->keep_running))
break;
}
@ -3556,7 +3265,7 @@ static int edge_init_sockets (n2n_edge_t *eee) {
#ifdef __linux__
static uint32_t get_gateway_ip() {
static uint32_t get_gateway_ip () {
FILE *fd;
char *token = NULL;
@ -3780,7 +3489,7 @@ static int routectl (int cmd, int flags, n2n_route_t *route, int if_idx) {
traceEvent(TRACE_DEBUG, route_cmd_to_str(cmd, route, route_buf, sizeof(route_buf)));
rv = 0;
out:
out:
close(nl_sock);
return(rv);
@ -4096,11 +3805,12 @@ int quick_edge_init (char *device_name, char *community_name,
if((eee = edge_init(&conf, &rv)) == NULL)
goto quick_edge_init_end;
rv = run_edge_loop(eee, keep_on_running);
eee->keep_running = keep_on_running;
rv = run_edge_loop(eee);
edge_term(eee);
edge_term_conf(&conf);
quick_edge_init_end:
quick_edge_init_end:
tuntap_close(&tuntap);
return(rv);
}

View File

@ -26,7 +26,7 @@ static DWORD* tunReadThread (LPVOID lpArg) {
struct tunread_arg *arg = (struct tunread_arg*)lpArg;
while(*arg->keep_running) {
while(*arg->eee->keep_running) {
edge_read_from_tap(arg->eee);
}

View File

@ -68,7 +68,8 @@ int main() {
}
keep_running = 1;
rc = run_edge_loop(eee, &keep_running);
eee->keep_running = &keep_running;
rc = run_edge_loop(eee);
edge_term(eee);
tuntap_close(&tuntap);

View File

@ -42,7 +42,8 @@ int main () {
sn_init(&sss_node);
keep_running = 1;
rc = run_sn_loop(&sss_node, &keep_running);
sss_node.keep_running = &keep_running;
rc = run_sn_loop(&sss_node);
sn_term(&sss_node);

439
src/sn_management.c Normal file
View File

@ -0,0 +1,439 @@
/**
* (C) 2007-21 - ntop.org and contributors
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not see see <http://www.gnu.org/licenses/>
*
*/
/*
* This file has a large amount of duplication with the edge_management.c
* code. In the fullness of time, they should both be merged
*/
#include "n2n.h"
#include "edge_utils_win32.h"
int load_allowed_sn_community (n2n_sn_t *sss); /* defined in sn_utils.c */
#define FLAG_WROK 1
typedef struct n2n_mgmt_handler {
int flags;
char *cmd;
char *help;
void (*func)(n2n_sn_t *sss, char *udp_buf, struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv);
} n2n_mgmt_handler_t;
static void mgmt_error (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, char *tag, char *msg) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"error\","
"\"error\":\"%s\"}\n",
tag,
msg);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_stop (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
if(type==N2N_MGMT_WRITE) {
*sss->keep_running = 0;
}
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"keep_running\":%u}\n",
tag,
*sss->keep_running);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_verbose (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
if(type==N2N_MGMT_WRITE) {
if(argv) {
setTraceLevel(strtoul(argv, NULL, 0));
}
}
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"traceLevel\":%u}\n",
tag,
getTraceLevel());
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_reload_communities (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
if(type!=N2N_MGMT_WRITE) {
mgmt_error(sss, udp_buf, sender_sock, tag, "writeonly");
return;
}
if(!sss->community_file) {
mgmt_error(sss, udp_buf, sender_sock, tag, "nofile");
return;
}
int ok = load_allowed_sn_community(sss);
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"ok\":%i}\n",
tag,
ok);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_timestamps (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"start_time\":%lu,"
"\"last_fwd\":%ld,"
"\"last_reg_super\":%ld}\n",
tag,
sss->start_time,
sss->stats.last_fwd,
sss->stats.last_reg_super);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_packetstats (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"forward\","
"\"tx_pkt\":%lu}\n",
tag,
sss->stats.fwd);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"broadcast\","
"\"tx_pkt\":%lu}\n",
tag,
sss->stats.broadcast);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"reg_super\","
"\"rx_pkt\":%lu,"
"\"nak\":%lu}\n",
tag,
sss->stats.reg_super,
sss->stats.reg_super_nak);
/* Note: reg_super_nak is not currently incremented anywhere */
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
/* Generic errors when trying to sendto() */
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"type\":\"errors\","
"\"tx_pkt\":%lu}\n",
tag,
sss->stats.errors);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
static void mgmt_communities (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
struct sn_community *community, *tmp;
dec_ip_bit_str_t ip_bit_str = {'\0'};
HASH_ITER(hh, sss->communities, community, tmp) {
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"community\":\"%s\","
"\"purgeable\":%i,"
"\"is_federation\":%i,"
"\"ip4addr\":\"%s\"}\n",
tag,
(community->is_federation) ? "-/-" : community->community,
community->purgeable,
community->is_federation,
(community->auto_ip_net.net_addr == 0) ? "" : ip_subnet_to_str(ip_bit_str, &community->auto_ip_net));
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
static void mgmt_edges (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
struct sn_community *community, *tmp;
struct peer_info *peer, *tmpPeer;
macstr_t mac_buf;
n2n_sock_str_t sockbuf;
dec_ip_bit_str_t ip_bit_str = {'\0'};
HASH_ITER(hh, sss->communities, community, tmp) {
HASH_ITER(hh, community->edges, peer, tmpPeer) {
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"community\":\"%s\","
"\"ip4addr\":\"%s\","
"\"purgeable\":%i,"
"\"macaddr\":\"%s\","
"\"sockaddr\":\"%s\","
"\"proto\":\"%s\","
"\"desc\":\"%s\","
"\"last_seen\":%li}\n",
tag,
(community->is_federation) ? "-/-" : community->community,
(peer->dev_addr.net_addr == 0) ? "" : ip_subnet_to_str(ip_bit_str, &peer->dev_addr),
peer->purgeable,
(is_null_mac(peer->mac_addr)) ? "" : macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
((peer->socket_fd >= 0) && (peer->socket_fd != sss->sock)) ? "TCP" : "UDP",
peer->dev_desc,
peer->last_seen);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
}
static void mgmt_unimplemented (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
mgmt_error(sss, udp_buf, sender_sock, tag, "unimplemented");
}
static void mgmt_help (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv);
n2n_mgmt_handler_t mgmt_handlers[] = {
{ .cmd = "supernodes", .help = "Reserved for edge", .func = mgmt_unimplemented},
{ .cmd = "stop", .flags = FLAG_WROK, .help = "Gracefully exit edge", .func = mgmt_stop},
{ .cmd = "verbose", .flags = FLAG_WROK, .help = "Manage verbosity level", .func = mgmt_verbose},
{ .cmd = "reload_communities", .flags = FLAG_WROK, .help = "Reloads communities and user's public keys", .func = mgmt_reload_communities},
{ .cmd = "communities", .help = "List current communities", .func = mgmt_communities},
{ .cmd = "edges", .help = "List current edges/peers", .func = mgmt_edges},
{ .cmd = "timestamps", .help = "Event timestamps", .func = mgmt_timestamps},
{ .cmd = "packetstats", .help = "Traffic statistics", .func = mgmt_packetstats},
{ .cmd = "help", .flags = FLAG_WROK, .help = "Show JSON commands", .func = mgmt_help},
{ .cmd = NULL },
};
static void mgmt_help (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *tag, char *argv0, char *argv) {
size_t msg_len;
n2n_mgmt_handler_t *handler;
/*
* Even though this command is readonly, we deliberately do not check
* the type - allowing help replies to both read and write requests
*/
for( handler=mgmt_handlers; handler->cmd; handler++ ) {
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{"
"\"_tag\":\"%s\","
"\"_type\":\"row\","
"\"cmd\":\"%s\","
"\"help\":\"%s\"}\n",
tag,
handler->cmd,
handler->help);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
}
}
/*
* Check if the user is authorised for this command.
* - this should be more configurable!
* - for the moment we use some simple heuristics:
* Reads are not dangerous, so they are simply allowed
* Writes are possibly dangerous, so they need a fake password
*/
static int mgmt_auth (const struct sockaddr_in sender_sock, enum n2n_mgmt_type type, char *auth, char *argv0, char *argv) {
if(auth) {
/* If we have an auth key, it must match */
if(0 == strcmp(auth,"CHANGEME")) {
return 1;
}
return 0;
}
/* if we dont have an auth key, we can still read */
if(type==N2N_MGMT_READ) {
return 1;
}
return 0;
}
void handleMgmtJson_sn (n2n_sn_t *sss, char *udp_buf, const struct sockaddr_in sender_sock) {
char cmdlinebuf[80];
enum n2n_mgmt_type type;
char *typechar;
char *options;
char *argv0;
char *argv;
char *tag;
char *flagstr;
int flags;
char *auth;
n2n_mgmt_handler_t *handler;
size_t msg_len;
/* save a copy of the commandline before we reuse the udp_buf */
strncpy(cmdlinebuf, udp_buf, sizeof(cmdlinebuf)-1);
cmdlinebuf[sizeof(cmdlinebuf)-1] = 0;
traceEvent(TRACE_DEBUG, "mgmt json %s", cmdlinebuf);
typechar = strtok(cmdlinebuf, " \r\n");
if(!typechar) {
/* should not happen */
mgmt_error(sss, udp_buf, sender_sock, "-1", "notype");
return;
}
if(*typechar == 'r') {
type=N2N_MGMT_READ;
} else if(*typechar == 'w') {
type=N2N_MGMT_WRITE;
} else {
/* dunno how we got here */
mgmt_error(sss, udp_buf, sender_sock, "-1", "badtype");
return;
}
/* Extract the tag to use in all reply packets */
options = strtok(NULL, " \r\n");
if(!options) {
mgmt_error(sss, udp_buf, sender_sock, "-1", "nooptions");
return;
}
argv0 = strtok(NULL, " \r\n");
if(!argv0) {
mgmt_error(sss, udp_buf, sender_sock, "-1", "nocmd");
return;
}
/*
* The entire rest of the line is the argv. We apply no processing
* or arg separation so that the cmd can use it however it needs.
*/
argv = strtok(NULL, "\r\n");
/*
* There might be an auth token mixed in with the tag
*/
tag = strtok(options, ":");
flagstr = strtok(NULL, ":");
if(flagstr) {
flags = strtoul(flagstr, NULL, 16);
} else {
flags = 0;
}
/* Only 1 flag bit defined at the moment - "auth option present" */
if(flags & 1) {
auth = strtok(NULL, ":");
} else {
auth = NULL;
}
if(!mgmt_auth(sender_sock, type, auth, argv0, argv)) {
mgmt_error(sss, udp_buf, sender_sock, tag, "badauth");
return;
}
for( handler=mgmt_handlers; handler->cmd; handler++ ) {
if(0 == strcmp(handler->cmd, argv0)) {
break;
}
}
if(!handler->cmd) {
mgmt_error(sss, udp_buf, sender_sock, tag, "unknowncmd");
return;
}
if((type==N2N_MGMT_WRITE) && !(handler->flags & FLAG_WROK)) {
mgmt_error(sss, udp_buf, sender_sock, tag, "readonly");
return;
}
/*
* TODO:
* The tag provided by the requester could contain chars
* that make our JSON invalid.
* - do we care?
*/
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{\"_tag\":\"%s\",\"_type\":\"begin\",\"cmd\":\"%s\"}\n", tag, argv0);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
handler->func(sss, udp_buf, sender_sock, type, tag, argv0, argv);
msg_len = snprintf(udp_buf, N2N_PKT_BUF_SIZE,
"{\"_tag\":\"%s\",\"_type\":\"end\"}\n", tag);
sendto(sss->mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
return;
}

View File

@ -63,7 +63,7 @@ static int sort_communities (n2n_sn_t *sss,
static int process_mgmt (n2n_sn_t *sss,
const struct sockaddr_in *sender_sock,
const uint8_t *mgmt_buf,
char *mgmt_buf,
size_t mgmt_size,
time_t now);
@ -1493,7 +1493,7 @@ static int sort_communities (n2n_sn_t *sss,
static int process_mgmt (n2n_sn_t *sss,
const struct sockaddr_in *sender_sock,
const uint8_t *mgmt_buf,
char *mgmt_buf,
size_t mgmt_size,
time_t now) {
@ -1511,6 +1511,9 @@ static int process_mgmt (n2n_sn_t *sss,
traceEvent(TRACE_DEBUG, "process_mgmt");
/* avoid parsing any uninitialized junk from the stack */
mgmt_buf[mgmt_size] = 0;
// process input, if any
if((0 == memcmp(mgmt_buf, "help", 4)) || (0 == memcmp(mgmt_buf, "?", 1))) {
ressize += snprintf(resbuf + ressize, N2N_SN_PKTBUF_SIZE - ressize,
@ -1543,6 +1546,12 @@ static int process_mgmt (n2n_sn_t *sss,
return 0; /* no status output afterwards */
}
if((mgmt_buf[0] == 'r' || mgmt_buf[0] == 'w') && (mgmt_buf[1] == ' ')) {
/* this is a JSON request */
handleMgmtJson_sn(sss, mgmt_buf, *sender_sock);
return 0;
}
// output current status
ressize += snprintf(resbuf + ressize, N2N_SN_PKTBUF_SIZE - ressize,
@ -2690,7 +2699,7 @@ static int process_udp (n2n_sn_t * sss,
/** Long lived processing entry point. Split out from main to simply
* daemonisation on some platforms. */
int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
int run_sn_loop (n2n_sn_t *sss) {
uint8_t pktbuf[N2N_SN_PKTBUF_SIZE];
time_t last_purge_edges = 0;
@ -2699,7 +2708,7 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
sss->start_time = time(NULL);
while(*keep_running) {
while(*sss->keep_running) {
int rc;
ssize_t bread;
int max_sock;
@ -2764,7 +2773,7 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
#ifdef WIN32
traceEvent(TRACE_ERROR, "WSAGetLastError(): %u", WSAGetLastError());
#endif
*keep_running = 0;
*sss->keep_running = 0;
break;
}
@ -2880,16 +2889,16 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
if(bread <= 0) {
traceEvent(TRACE_ERROR, "recvfrom() failed %d errno %d (%s)", bread, errno, strerror(errno));
*keep_running = 0;
*sss->keep_running = 0;
break;
}
// we have a datagram to process
process_mgmt(sss, &sender_sock, pktbuf, bread, now);
process_mgmt(sss, &sender_sock, (char *)pktbuf, bread, now);
}
} else {
if(((now - before) < wait_time.tv_sec) && (*keep_running)){
if(((now - before) < wait_time.tv_sec) && (*sss->keep_running)){
// this is no real timeout, something went wrong with one of the tcp connections (probably)
// close them all, edges will re-open if they detect closure
traceEvent(TRACE_DEBUG, "falsly claimed timeout, assuming issue with tcp connection, closing them all");

View File

@ -658,5 +658,6 @@ int main (int argc, char * const argv[]) {
#endif
keep_running = 1;
return run_sn_loop(&sss_node, &keep_running);
sss_node.keep_running = &keep_running;
return run_sn_loop(&sss_node);
}