#!/usr/bin/env perl use warnings; use strict; # # Requires # libjson-perl # # Magic Markers # #%# family=auto #%# capabilities=autoconf suggest package JsonUDP; use warnings; use strict; use IO::Socket::INET; use JSON; sub new { my $class = shift; my $port = shift || 5644; my $self = {}; bless($self, $class); $self->{sock} = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => $port, Proto => 'udp', ); $self->{json} = JSON->new->utf8->relaxed->pretty->canonical; $self->{tag} = 0; $self->{debug} = 0; return $self; } sub _tx { my $self = shift; my $msgline = shift; return $self->{sock}->send($msgline); } sub _rx { my $self = shift; my $tag = shift; my $db = []; my $error; while(1) { my $jsontxt; $self->{sock}->recv($jsontxt,1024); if ($self->{debug}) { print($jsontxt); } my $msg = $self->{json}->decode($jsontxt); # ignore packets not for us if ($msg->{_tag} ne $tag) { next; } # Save most recent error for return if ($msg->{_type} eq 'error') { $error = $msg; next; } if ($msg->{_type} eq 'end') { if ($error) { # TODO: an error channel return undef; } return $db; } if ($msg->{_type} eq 'row') { delete $msg->{_tag}; delete $msg->{_type}; push @$db, $msg; next; } # Ignore any unknown _type } } sub read { my $self = shift; my $cmdline = shift; my $tag = $self->{tag}++; # TODO: # Add a read cache $self->_tx(sprintf("r %i %s", $tag, $cmdline)); return $self->_rx($tag); } 1; package main; use warnings; use strict; my $config = { edge_pkts => { p2p_tx_pkt => { label => 'Peer to Peer tx rate', type => 'DERIVE', min => 0, }, p2p_rx_pkt => { label => 'Peer to Peer rx rate', type => 'DERIVE', min => 0, }, super_tx_pkt => { label => 'Peer to Supernode tx rate', type => 'DERIVE', min => 0, }, super_rx_pkt => { label => 'Peer to Supernode rx rate', type => 'DERIVE', min => 0, }, super_broadcast_tx_pkt => { label => 'Broadcast to Supernode tx rate', type => 'DERIVE', min => 0, }, super_broadcast_rx_pkt => { label => 'Broadcast to Supernode rx rate', type => 'DERIVE', min => 0, }, transop_tx_pkt => { label => 'Transform tx rate', type => 'DERIVE', min => 0, }, transop_rx_pkt => { label => 'Transform rx rate', type => 'DERIVE', min => 0, }, }, edge_counts => { edges => { label => 'Current known peers', type => 'GAUGE', }, supernodes => { label => 'Current known supernodes', type => 'GAUGE', }, }, supernode_pkts => { errors_tx_pkt => { label => 'Error rate', type => 'DERIVE', min => 0, }, reg_super_rx_pkt => { label => 'Connect rate', type => 'DERIVE', min => 0, }, reg_super_nak => { label => 'Connect error rate', type => 'DERIVE', min => 0, }, forward_tx_pkt => { label => 'Packets forwarded rate', type => 'DERIVE', min => 0, }, broadcast_tx_pkt => { label => 'Broadcast packet rate', type => 'DERIVE', min => 0, }, }, supernode_counts => { edges => { label => 'Current known edges', type => 'GAUGE', }, communities => { label => 'Current known communities', type => 'GAUGE', }, }, }; my $fetchinfo = { edge_pkts => { port => 5644, read => "packetstats", }, edge_counts => { port => 5644, count => [ "edges", "supernodes", ], }, supernode_pkts => { port => 5645, read => "packetstats", }, supernode_counts => { port => 5645, count => [ "edges", "communities", ], }, }; sub do_config { my $rpc = shift; my $name = shift; print("graph_title n2n $name status\n"); print("graph_category network\n"); my @names; while (my ($fieldname, $field) = each(%{$config->{$name}})) { push @names, $fieldname; while (my ($key, $val) = each(%{$field})) { print($fieldname.'.'.$key," ",$val,"\n"); } } # Ensure stable order print("graph_order ", join(' ', sort(@names)), "\n"); } sub do_fetch { my $rpc = shift; my $name = shift; my $db; my $read_table = $fetchinfo->{$name}->{read}; if (defined($read_table)) { $db = $rpc->read($read_table); for my $row (@$db) { my $type = $row->{type}; delete $row->{type}; while (my ($key, $val) = each(%{$row})) { my $metricname = $type."_".$key; print($metricname,".value ",$val,"\n"); } } } my $count_tables = $fetchinfo->{$name}->{count}; if (defined($count_tables)) { for my $table (@{$count_tables}) { $db = $rpc->read($table); print($table,".value ", scalar(@$db), "\n"); } } } sub do_autoconf { # quick check to see if this plugin should be enabled if (`pgrep supernode`) { print("yes\n"); } elsif (`pgrep edge`) { print("yes\n"); } else { print("no - neither edge nor supernode are running\n"); } } sub do_suggest { my $ports = {}; if (`pgrep supernode`) { $ports->{5645}=1; } if (`pgrep edge`) { $ports->{5644}=1; } while (my ($name, $info) = each(%{$fetchinfo})) { my $port = $info->{port}; next if (!defined($port)); # this not a real fetchinfo next if (!defined($ports->{$port})); # not linked to a running daemon print($name,"\n"); } } my $subc = { 'fetch' => \&do_fetch, 'config' => \&do_config, 'autoconf' => \&do_autoconf, 'suggest' => \&do_suggest, }; sub main() { my $name = $ARGV[1] || $0; $name =~ s%^.*/n2n_([^/]+)%$1%; my $port = $fetchinfo->{$name}->{port}; my $rpc = JsonUDP->new($port); my $cmd = $ARGV[0]; if (!defined($cmd)) { $cmd = 'fetch'; } my $func = $subc->{$cmd}; if (!defined($func)) { die("bad sub command"); } return $func->($rpc, $name); } main();