summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--flowtop.c228
1 files changed, 202 insertions, 26 deletions
diff --git a/flowtop.c b/flowtop.c
index e45650f..fb42bc4 100644
--- a/flowtop.c
+++ b/flowtop.c
@@ -23,6 +23,8 @@
#include <urcu.h>
#include <libgen.h>
#include <inttypes.h>
+#include <poll.h>
+#include <fcntl.h>
#include "die.h"
#include "xmalloc.h"
@@ -53,6 +55,8 @@ struct flow_entry {
struct flow_entry *next;
int inode;
unsigned int procnum;
+ bool is_visible;
+ struct nf_conntrack *ct;
};
struct flow_list {
@@ -81,6 +85,7 @@ static volatile sig_atomic_t sigint = 0;
static int what = INCLUDE_IPV4 | INCLUDE_IPV6 | INCLUDE_TCP, show_src = 0;
static struct flow_list flow_list;
static struct condlock collector_ready;
+static int nfct_acct_val = -1;
static const char *short_options = "vhTUsDIS46u";
static const struct option long_options[] = {
@@ -217,6 +222,62 @@ static const struct nfct_filter_ipv6 filter_ipv6 = {
.mask = { 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff },
};
+#define SYS_PATH "/proc/sys/"
+
+static int sysctl_set_int(char *file, int value)
+{
+ char path[PATH_MAX];
+ char str[64];
+ ssize_t ret;
+ int fd;
+
+ path[0] = '\0';
+ strcat(path, SYS_PATH);
+ strncat(path, file, PATH_MAX - sizeof(SYS_PATH) - 1);
+
+ fd = open(path, O_WRONLY);
+ if (unlikely(fd < 0))
+ return -1;
+
+ ret = snprintf(str, 63, "%d", value);
+ if (ret < 0) {
+ close(fd);
+ return -1;
+ }
+
+ ret = write(fd, str, strlen(str));
+
+ close(fd);
+ return ret <= 0 ? -1 : 0;
+}
+
+static int sysctl_get_int(char *file, int *value)
+{
+ char path[PATH_MAX];
+ char str[64];
+ ssize_t ret;
+ int fd;
+
+ path[0] = '\0';
+ strcat(path, SYS_PATH);
+ strncat(path, file, PATH_MAX - sizeof(SYS_PATH) - 1);
+
+ fd = open(path, O_RDONLY);
+ if (fd < 0)
+ return -1;
+
+ ret = read(fd, str, sizeof(str));
+ if (ret > 0) {
+ *value = atoi(str);
+ ret = 0;
+ } else {
+ ret = -1;
+ }
+
+ close(fd);
+ return ret;
+}
+
static void signal_handler(int number)
{
switch (number) {
@@ -279,6 +340,9 @@ static inline struct flow_entry *flow_entry_xalloc(void)
static inline void flow_entry_xfree(struct flow_entry *n)
{
+ if (n->ct)
+ nfct_destroy(n->ct);
+
xfree(n);
}
@@ -292,6 +356,8 @@ static void flow_list_new_entry(struct flow_list *fl, struct nf_conntrack *ct)
{
struct flow_entry *n = flow_entry_xalloc();
+ n->ct = nfct_clone(ct);
+
flow_entry_from_ct(n, ct);
flow_entry_get_extended(n);
@@ -335,22 +401,15 @@ static struct flow_entry *flow_list_find_prev_id(struct flow_list *fl,
static void flow_list_update_entry(struct flow_list *fl,
struct nf_conntrack *ct)
{
- int do_ext = 0;
struct flow_entry *n;
n = flow_list_find_id(fl, nfct_get_attr_u32(ct, ATTR_ID));
if (n == NULL) {
- n = flow_entry_xalloc();
- do_ext = 1;
+ flow_list_new_entry(fl, ct);
+ return;
}
flow_entry_from_ct(n, ct);
- if (do_ext) {
- flow_entry_get_extended(n);
-
- rcu_assign_pointer(n->next, fl->head);
- rcu_assign_pointer(fl->head, n);
- }
}
static void flow_list_destroy_entry(struct flow_list *fl,
@@ -925,22 +984,30 @@ static void presenter_screen_update(WINDOW *screen, struct flow_list *fl,
mvwprintw(screen, line, 2, "(No active sessions! "
"Is netfilter running?)");
- for (; n && maxy > 0; n = rcu_dereference(n->next)) {
+ for (; n; n = rcu_dereference(n->next)) {
+ if (maxy <= 0)
+ goto skip;
+
if (presenter_get_port(n->port_src, n->port_dst, 0) == 53)
- continue;
+ goto skip;
if (presenter_flow_wrong_state(n))
- continue;
+ goto skip;
if (skip_lines > 0) {
skip_lines--;
- continue;
+ goto skip;
}
presenter_screen_do_line(screen, n, &line);
line++;
maxy -= (2 + 1 * show_src);
+ n->is_visible = true;
+ continue;
+skip:
+ n->is_visible = false;
+ continue;
}
rcu_read_unlock();
@@ -986,7 +1053,7 @@ static void presenter(void)
}
presenter_screen_update(screen, &flow_list, skip_lines);
- usleep(100000);
+ usleep(200000);
}
rcu_unregister_thread();
@@ -1028,26 +1095,92 @@ static inline void collector_flush(struct nfct_handle *handle, uint8_t family)
nfct_query(handle, NFCT_Q_FLUSH, &family);
}
+static void restore_sysctl(void *value)
+{
+ int int_val = *(int *)value;
+
+ if (int_val == 0)
+ sysctl_set_int("net/netfilter/nf_conntrack_acct", int_val);
+}
+
+static void on_panic_handler(void *arg)
+{
+ restore_sysctl(arg);
+ screen_end();
+}
+
+static void conntrack_acct_enable(void)
+{
+ /* We can still work w/o traffic accounting so just warn about error */
+ if (sysctl_get_int("net/netfilter/nf_conntrack_acct", &nfct_acct_val)) {
+ fprintf(stderr, "Can't read net/netfilter/nf_conntrack_acct: %s\n",
+ strerror(errno));
+ }
+
+ if (nfct_acct_val == 1)
+ return;
+
+ if (sysctl_set_int("net/netfilter/nf_conntrack_acct", 1)) {
+ fprintf(stderr, "Can't write net/netfilter/nf_conntrack_acct: %s\n",
+ strerror(errno));
+ }
+}
+
+static int dump_cb(enum nf_conntrack_msg_type type,
+ struct nf_conntrack *ct, void *data __maybe_unused)
+{
+ struct flow_entry *n;
+
+ if (type != NFCT_T_UPDATE)
+ return NFCT_CB_CONTINUE;
+
+ if (sigint)
+ return NFCT_CB_STOP;
+
+ n = flow_list_find_id(&flow_list, nfct_get_attr_u32(ct, ATTR_ID));
+ if (!n)
+ return NFCT_CB_CONTINUE;
+
+ flow_entry_from_ct(n, ct);
+
+ return NFCT_CB_CONTINUE;
+}
+
+static void collector_refresh_flows(struct nfct_handle *handle)
+{
+ struct flow_entry *n;
+
+ n = rcu_dereference(flow_list.head);
+ for (; n; n = rcu_dereference(n->next)) {
+ if (!n->is_visible)
+ continue;
+
+ nfct_query(handle, NFCT_Q_GET, n->ct);
+ }
+}
+
static void *collector(void *null __maybe_unused)
{
- int ret;
- struct nfct_handle *handle;
+ struct nfct_handle *ct_event;
+ struct nfct_handle *ct_dump;
struct nfct_filter *filter;
+ struct pollfd poll_fd[1];
+ int ret;
- handle = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
+ ct_event = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
NF_NETLINK_CONNTRACK_UPDATE |
NF_NETLINK_CONNTRACK_DESTROY);
- if (!handle)
+ if (!ct_event)
panic("Cannot create a nfct handle: %s\n", strerror(errno));
- collector_flush(handle, AF_INET);
- collector_flush(handle, AF_INET6);
+ collector_flush(ct_event, AF_INET);
+ collector_flush(ct_event, AF_INET6);
filter = nfct_filter_create();
if (!filter)
panic("Cannot create a nfct filter: %s\n", strerror(errno));
- ret = nfct_filter_attach(nfct_fd(handle), filter);
+ ret = nfct_filter_attach(nfct_fd(ct_event), filter);
if (ret < 0)
panic("Cannot attach filter to handle: %s\n", strerror(errno));
@@ -1074,25 +1207,62 @@ static void *collector(void *null __maybe_unused)
nfct_filter_add_attr(filter, NFCT_FILTER_SRC_IPV6, &filter_ipv6);
}
- ret = nfct_filter_attach(nfct_fd(handle), filter);
+ ret = nfct_filter_attach(nfct_fd(ct_event), filter);
if (ret < 0)
panic("Cannot attach filter to handle: %s\n", strerror(errno));
- nfct_callback_register(handle, NFCT_T_ALL, collector_cb, NULL);
nfct_filter_destroy(filter);
+
+ nfct_callback_register(ct_event, NFCT_T_ALL, collector_cb, NULL);
flow_list_init(&flow_list);
+ ct_dump = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_UPDATE);
+ if (!ct_dump)
+ panic("Cannot create a nfct handle: %s\n", strerror(errno));
+
+ nfct_callback_register(ct_dump, NFCT_T_ALL, dump_cb, NULL);
+
+ poll_fd[0].fd = nfct_fd(ct_event);
+ poll_fd[0].events = POLLIN;
+
+ if (fcntl(nfct_fd(ct_event), F_SETFL, O_NONBLOCK) == -1)
+ panic("Cannot set non-blocking socket: fcntl(): %s\n",
+ strerror(errno));
+
+ if (fcntl(nfct_fd(ct_dump), F_SETFL, O_NONBLOCK) == -1)
+ panic("Cannot set non-blocking socket: fcntl(): %s\n",
+ strerror(errno));
+
condlock_signal(&collector_ready);
rcu_register_thread();
- while (!sigint && ret >= 0)
- ret = nfct_catch(handle);
+ while (!sigint && ret >= 0) {
+ int status;
+
+ usleep(300000);
+
+ collector_refresh_flows(ct_dump);
+
+ status = poll(poll_fd, 1, 0);
+ if (status < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
+
+ panic("Error while polling: %s\n", strerror(errno));
+ } else if (status == 0) {
+ continue;
+ }
+
+ if (poll_fd[0].revents & POLLIN)
+ nfct_catch(ct_event);
+ }
rcu_unregister_thread();
flow_list_destroy(&flow_list);
- nfct_close(handle);
+ nfct_close(ct_event);
+ nfct_close(ct_dump);
pthread_exit(NULL);
}
@@ -1157,6 +1327,10 @@ int main(int argc, char **argv)
register_signal(SIGTERM, signal_handler);
register_signal(SIGHUP, signal_handler);
+ panic_handler_add(on_panic_handler, &nfct_acct_val);
+
+ conntrack_acct_enable();
+
init_geoip(1);
condlock_init(&collector_ready);
@@ -1171,5 +1345,7 @@ int main(int argc, char **argv)
destroy_geoip();
+ restore_sysctl(&nfct_acct_val);
+
return 0;
}