revised tcp implementation (#674)

This commit is contained in:
Logan oos Even 2021-03-26 20:45:31 +05:45 committed by GitHub
parent b949b23e2a
commit 7c3951a10f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 19 deletions

View File

@ -229,14 +229,14 @@ int time_stamp_verify_and_update (uint64_t stamp, uint64_t * previous_stamp, int
/* Operations on peer_info lists. */ /* Operations on peer_info lists. */
size_t purge_peer_list (struct peer_info ** peer_list, size_t purge_peer_list (struct peer_info ** peer_list,
SOCKET socket_not_to_close, SOCKET socket_not_to_close,
n2n_tcp_connection_t *tcp_connections, n2n_tcp_connection_t **tcp_connections,
time_t purge_before); time_t purge_before);
size_t clear_peer_list (struct peer_info ** peer_list); size_t clear_peer_list (struct peer_info ** peer_list);
size_t purge_expired_nodes (struct peer_info **peer_list, size_t purge_expired_nodes (struct peer_info **peer_list,
SOCKET socket_not_to_close, SOCKET socket_not_to_close,
n2n_tcp_connection_t *tcp_connections, n2n_tcp_connection_t **tcp_connections,
time_t *p_last_purge, time_t *p_last_purge,
int frequency, int timeout); int frequency, int timeout);

View File

@ -711,8 +711,9 @@ typedef struct n2n_tcp_connection {
struct sockaddr sock; /* network order socket */ struct sockaddr sock; /* network order socket */
uint16_t expected; /* number of bytes expected to be read */ uint16_t expected; /* number of bytes expected to be read */
uint16_t position; /* current position in the buffer*/ uint16_t position; /* current position in the buffer */
uint8_t buffer[N2N_PKT_BUF_SIZE + sizeof(uint16_t)]; /* buffer for data collected from tcp socket incl. prepended length */ uint8_t buffer[N2N_PKT_BUF_SIZE + sizeof(uint16_t)]; /* buffer for data collected from tcp socket incl. prepended length */
uint8_t inactive; /* connection not be handled if set, already closed and to be deleted soon */
UT_hash_handle hh; /* makes this structure hashable */ UT_hash_handle hh; /* makes this structure hashable */
} n2n_tcp_connection_t; } n2n_tcp_connection_t;

View File

@ -440,7 +440,7 @@ void print_n2n_version () {
size_t purge_expired_nodes (struct peer_info **peer_list, size_t purge_expired_nodes (struct peer_info **peer_list,
SOCKET socket_not_to_close, SOCKET socket_not_to_close,
n2n_tcp_connection_t *tcp_connections, n2n_tcp_connection_t **tcp_connections,
time_t *p_last_purge, time_t *p_last_purge,
int frequency, int timeout) { int frequency, int timeout) {
@ -463,9 +463,9 @@ size_t purge_expired_nodes (struct peer_info **peer_list,
/** Purge old items from the peer_list, eventually close the related socket, and /** Purge old items from the peer_list, eventually close the related socket, and
* return the number of items that were removed. */ * return the number of items that were removed. */
size_t purge_peer_list (struct peer_info ** peer_list, size_t purge_peer_list (struct peer_info **peer_list,
SOCKET socket_not_to_close, SOCKET socket_not_to_close,
n2n_tcp_connection_t *tcp_connections, n2n_tcp_connection_t **tcp_connections,
time_t purge_before) { time_t purge_before) {
struct peer_info *scan, *tmp; struct peer_info *scan, *tmp;
@ -476,9 +476,9 @@ size_t purge_peer_list (struct peer_info ** peer_list,
if((scan->purgeable == SN_PURGEABLE) && (scan->last_seen < purge_before)) { if((scan->purgeable == SN_PURGEABLE) && (scan->last_seen < purge_before)) {
if((scan->socket_fd >=0) && (scan->socket_fd != socket_not_to_close)) { if((scan->socket_fd >=0) && (scan->socket_fd != socket_not_to_close)) {
if(tcp_connections) { if(tcp_connections) {
HASH_FIND_INT(tcp_connections, &scan->socket_fd, conn); HASH_FIND_INT(*tcp_connections, &scan->socket_fd, conn);
if(conn) { if(conn) {
HASH_DEL(tcp_connections, conn); HASH_DEL(*tcp_connections, conn);
free(conn); free(conn);
} }
shutdown(scan->socket_fd, SHUT_RDWR); shutdown(scan->socket_fd, SHUT_RDWR);

View File

@ -91,9 +91,8 @@ static void close_tcp_connection(n2n_sn_t *sss, n2n_tcp_connection_t *conn) {
// close the connection // close the connection
shutdown(conn->socket_fd, SHUT_RDWR); shutdown(conn->socket_fd, SHUT_RDWR);
closesocket(conn->socket_fd); closesocket(conn->socket_fd);
// forget about the connection // forget about the connection, will be deleted later
HASH_DEL(sss->tcp_connections, conn); conn->inactive = 1;
free(conn);
} }
@ -862,7 +861,7 @@ static int re_register_and_purge_supernodes (n2n_sn_t *sss, struct sn_community
} }
// purge long-time-not-seen supernodes // purge long-time-not-seen supernodes
purge_expired_nodes(&(comm->edges), sss->sock, sss->tcp_connections, p_last_re_reg_and_purge, purge_expired_nodes(&(comm->edges), sss->sock, &sss->tcp_connections, p_last_re_reg_and_purge,
RE_REG_AND_PURGE_FREQUENCY, LAST_SEEN_SN_INACTIVE); RE_REG_AND_PURGE_FREQUENCY, LAST_SEEN_SN_INACTIVE);
if(comm != NULL) { if(comm != NULL) {
@ -943,7 +942,7 @@ static int purge_expired_communities (n2n_sn_t *sss,
continue; continue;
// purge the community's local peers // purge the community's local peers
num_reg += purge_peer_list(&comm->edges, sss->sock, sss->tcp_connections, now - REGISTRATION_TIMEOUT); num_reg += purge_peer_list(&comm->edges, sss->sock, &sss->tcp_connections, now - REGISTRATION_TIMEOUT);
// purge the community's associated peers (connected to other supernodes) // purge the community's associated peers (connected to other supernodes)
HASH_ITER(hh, comm->assoc, assoc, tmp_assoc) { HASH_ITER(hh, comm->assoc, assoc, tmp_assoc) {
@ -2136,12 +2135,16 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
#ifdef N2N_HAVE_TCP #ifdef N2N_HAVE_TCP
// the so far known tcp connections // the so far known tcp connections
// do NOT use 'HASH_ITER(hh, sss->tcp_connections, conn, tmp_conn) {' to iterate because
// deletion of OTHER connections (that can happen if forwarding to another edge node fails) // beware: current conn and other items of the connection list may be found
// may result in seg faults if using HASH ITER which is only safe to use if deleting current item // due for deletion while processing packets. Even OTHER connections, e.g. if
for(conn = sss->tcp_connections; conn != NULL; conn = next) { // forwarding to another edge node fails. connections due for deletion will
// conn might not be available next iteration due to its deletion so we store 'next' upfront // not immediately be deleted but marked 'inactive' for later deletion
next = (n2n_tcp_connection_t*)(conn->hh.next); HASH_ITER(hh, sss->tcp_connections, conn, tmp_conn) {
// do not process entries that have been marked inactive, those will be deleted
// immediately after this loop
if(conn->inactive)
continue;
if(FD_ISSET(conn->socket_fd, &socket_mask)) { if(FD_ISSET(conn->socket_fd, &socket_mask)) {
@ -2191,6 +2194,14 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
} }
} }
// remove inactive / already closed tcp connections from list
HASH_ITER(hh, sss->tcp_connections, conn, tmp_conn) {
if(conn->inactive) {
HASH_DEL(sss->tcp_connections, conn);
free(conn);
}
}
// accept new incoming tcp connection // accept new incoming tcp connection
if(FD_ISSET(sss->tcp_sock, &socket_mask)) { if(FD_ISSET(sss->tcp_sock, &socket_mask)) {
struct sockaddr_in sender_sock; struct sockaddr_in sender_sock;
@ -2204,6 +2215,7 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) {
if(conn) { if(conn) {
conn->socket_fd = tmp_sock; conn->socket_fd = tmp_sock;
memcpy(&(conn->sock), &sender_sock, sizeof(struct sockaddr_in)); memcpy(&(conn->sock), &sender_sock, sizeof(struct sockaddr_in));
conn->inactive = 0;
conn->expected = sizeof(uint16_t); conn->expected = sizeof(uint16_t);
conn->position = 0; conn->position = 0;
HASH_ADD_INT(sss->tcp_connections, socket_fd, conn); HASH_ADD_INT(sss->tcp_connections, socket_fd, conn);