summaryrefslogtreecommitdiff
path: root/flowtop.c
diff options
context:
space:
mode:
authorVadim Kochan <vadim4j@gmail.com>2015-07-13 23:03:07 +0300
committerTobias Klauser <tklauser@distanz.ch>2015-07-17 12:03:26 +0200
commit6d2aa3dae7c2dc95f882786f5629dde05b91e5dc (patch)
tree669cc46ae24612171bed699db2ff5c7d4110e109 /flowtop.c
parentd53674ad684caa3914064a0805c68de8f3b03d4e (diff)
flowtop: Add connection traffic accounting
Mark each flow if it is visible on the screen to know if it is needed update traffic acct info. Changed to use non blocking recv of nf conntrack events to update traffic accounting. Now nf_conntrack is cloned when new flow entry is added to send dump request which is used to update traffic accounting info (packet, bytes). Signed-off-by: Vadim Kochan <vadim4j@gmail.com> [tklauser: Formatting changes] Signed-off-by: Tobias Klauser <tklauser@distanz.ch>
Diffstat (limited to 'flowtop.c')
-rw-r--r--flowtop.c228
1 files changed, 202 insertions, 26 deletions
diff --git a/flowtop.c b/flowtop.c
index e45650f..fb42bc4 100644
--- a/flowtop.c
+++ b/flowtop.c
@@ -23,6 +23,8 @@
#include <urcu.h>
#include <libgen.h>
#include <inttypes.h>
+#include <poll.h>
+#include <fcntl.h>
#include "die.h"
#include "xmalloc.h"
@@ -53,6 +55,8 @@ struct flow_entry {
struct flow_entry *next;
int inode;
unsigned int procnum;
+ bool is_visible;
+ struct nf_conntrack *ct;
};
struct flow_list {
@@ -81,6 +85,7 @@ static volatile sig_atomic_t sigint = 0;
static int what = INCLUDE_IPV4 | INCLUDE_IPV6 | INCLUDE_TCP, show_src = 0;
static struct flow_list flow_list;
static struct condlock collector_ready;
+static int nfct_acct_val = -1;
static const char *short_options = "vhTUsDIS46u";
static const struct option long_options[] = {
@@ -217,6 +222,62 @@ static const struct nfct_filter_ipv6 filter_ipv6 = {
.mask = { 0xffffffff, 0xffffffff, 0xffffffff, 0xffffffff },
};
+#define SYS_PATH "/proc/sys/"
+
+static int sysctl_set_int(char *file, int value)
+{
+ char path[PATH_MAX];
+ char str[64];
+ ssize_t ret;
+ int fd;
+
+ path[0] = '\0';
+ strcat(path, SYS_PATH);
+ strncat(path, file, PATH_MAX - sizeof(SYS_PATH) - 1);
+
+ fd = open(path, O_WRONLY);
+ if (unlikely(fd < 0))
+ return -1;
+
+ ret = snprintf(str, 63, "%d", value);
+ if (ret < 0) {
+ close(fd);
+ return -1;
+ }
+
+ ret = write(fd, str, strlen(str));
+
+ close(fd);
+ return ret <= 0 ? -1 : 0;
+}
+
+static int sysctl_get_int(char *file, int *value)
+{
+ char path[PATH_MAX];
+ char str[64];
+ ssize_t ret;
+ int fd;
+
+ path[0] = '\0';
+ strcat(path, SYS_PATH);
+ strncat(path, file, PATH_MAX - sizeof(SYS_PATH) - 1);
+
+ fd = open(path, O_RDONLY);
+ if (fd < 0)
+ return -1;
+
+ ret = read(fd, str, sizeof(str));
+ if (ret > 0) {
+ *value = atoi(str);
+ ret = 0;
+ } else {
+ ret = -1;
+ }
+
+ close(fd);
+ return ret;
+}
+
static void signal_handler(int number)
{
switch (number) {
@@ -279,6 +340,9 @@ static inline struct flow_entry *flow_entry_xalloc(void)
static inline void flow_entry_xfree(struct flow_entry *n)
{
+ if (n->ct)
+ nfct_destroy(n->ct);
+
xfree(n);
}
@@ -292,6 +356,8 @@ static void flow_list_new_entry(struct flow_list *fl, struct nf_conntrack *ct)
{
struct flow_entry *n = flow_entry_xalloc();
+ n->ct = nfct_clone(ct);
+
flow_entry_from_ct(n, ct);
flow_entry_get_extended(n);
@@ -335,22 +401,15 @@ static struct flow_entry *flow_list_find_prev_id(struct flow_list *fl,
static void flow_list_update_entry(struct flow_list *fl,
struct nf_conntrack *ct)
{
- int do_ext = 0;
struct flow_entry *n;
n = flow_list_find_id(fl, nfct_get_attr_u32(ct, ATTR_ID));
if (n == NULL) {
- n = flow_entry_xalloc();
- do_ext = 1;
+ flow_list_new_entry(fl, ct);
+ return;
}
flow_entry_from_ct(n, ct);
- if (do_ext) {
- flow_entry_get_extended(n);
-
- rcu_assign_pointer(n->next, fl->head);
- rcu_assign_pointer(fl->head, n);
- }
}
static void flow_list_destroy_entry(struct flow_list *fl,
@@ -925,22 +984,30 @@ static void presenter_screen_update(WINDOW *screen, struct flow_list *fl,
mvwprintw(screen, line, 2, "(No active sessions! "
"Is netfilter running?)");
- for (; n && maxy > 0; n = rcu_dereference(n->next)) {
+ for (; n; n = rcu_dereference(n->next)) {
+ if (maxy <= 0)
+ goto skip;
+
if (presenter_get_port(n->port_src, n->port_dst, 0) == 53)
- continue;
+ goto skip;
if (presenter_flow_wrong_state(n))
- continue;
+ goto skip;
if (skip_lines > 0) {
skip_lines--;
- continue;
+ goto skip;
}
presenter_screen_do_line(screen, n, &line);
line++;
maxy -= (2 + 1 * show_src);
+ n->is_visible = true;
+ continue;
+skip:
+ n->is_visible = false;
+ continue;
}
rcu_read_unlock();
@@ -986,7 +1053,7 @@ static void presenter(void)
}
presenter_screen_update(screen, &flow_list, skip_lines);
- usleep(100000);
+ usleep(200000);
}
rcu_unregister_thread();
@@ -1028,26 +1095,92 @@ static inline void collector_flush(struct nfct_handle *handle, uint8_t family)
nfct_query(handle, NFCT_Q_FLUSH, &family);
}
+static void restore_sysctl(void *value)
+{
+ int int_val = *(int *)value;
+
+ if (int_val == 0)
+ sysctl_set_int("net/netfilter/nf_conntrack_acct", int_val);
+}
+
+static void on_panic_handler(void *arg)
+{
+ restore_sysctl(arg);
+ screen_end();
+}
+
+static void conntrack_acct_enable(void)
+{
+ /* We can still work w/o traffic accounting so just warn about error */
+ if (sysctl_get_int("net/netfilter/nf_conntrack_acct", &nfct_acct_val)) {
+ fprintf(stderr, "Can't read net/netfilter/nf_conntrack_acct: %s\n",
+ strerror(errno));
+ }
+
+ if (nfct_acct_val == 1)
+ return;
+
+ if (sysctl_set_int("net/netfilter/nf_conntrack_acct", 1)) {
+ fprintf(stderr, "Can't write net/netfilter/nf_conntrack_acct: %s\n",
+ strerror(errno));
+ }
+}
+
+static int dump_cb(enum nf_conntrack_msg_type type,
+ struct nf_conntrack *ct, void *data __maybe_unused)
+{
+ struct flow_entry *n;
+
+ if (type != NFCT_T_UPDATE)
+ return NFCT_CB_CONTINUE;
+
+ if (sigint)
+ return NFCT_CB_STOP;
+
+ n = flow_list_find_id(&flow_list, nfct_get_attr_u32(ct, ATTR_ID));
+ if (!n)
+ return NFCT_CB_CONTINUE;
+
+ flow_entry_from_ct(n, ct);
+
+ return NFCT_CB_CONTINUE;
+}
+
+static void collector_refresh_flows(struct nfct_handle *handle)
+{
+ struct flow_entry *n;
+
+ n = rcu_dereference(flow_list.head);
+ for (; n; n = rcu_dereference(n->next)) {
+ if (!n->is_visible)
+ continue;
+
+ nfct_query(handle, NFCT_Q_GET, n->ct);
+ }
+}
+
static void *collector(void *null __maybe_unused)
{
- int ret;
- struct nfct_handle *handle;
+ struct nfct_handle *ct_event;
+ struct nfct_handle *ct_dump;
struct nfct_filter *filter;
+ struct pollfd poll_fd[1];
+ int ret;
- handle = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
+ ct_event = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
NF_NETLINK_CONNTRACK_UPDATE |
NF_NETLINK_CONNTRACK_DESTROY);
- if (!handle)
+ if (!ct_event)
panic("Cannot create a nfct handle: %s\n", strerror(errno));
- collector_flush(handle, AF_INET);
- collector_flush(handle, AF_INET6);
+ collector_flush(ct_event, AF_INET);
+ collector_flush(ct_event, AF_INET6);
filter = nfct_filter_create();
if (!filter)
panic("Cannot create a nfct filter: %s\n", strerror(errno));
- ret = nfct_filter_attach(nfct_fd(handle), filter);
+ ret = nfct_filter_attach(nfct_fd(ct_event), filter);
if (ret < 0)
panic("Cannot attach filter to handle: %s\n", strerror(errno));
@@ -1074,25 +1207,62 @@ static void *collector(void *null __maybe_unused)
nfct_filter_add_attr(filter, NFCT_FILTER_SRC_IPV6, &filter_ipv6);
}
- ret = nfct_filter_attach(nfct_fd(handle), filter);
+ ret = nfct_filter_attach(nfct_fd(ct_event), filter);
if (ret < 0)
panic("Cannot attach filter to handle: %s\n", strerror(errno));
- nfct_callback_register(handle, NFCT_T_ALL, collector_cb, NULL);
nfct_filter_destroy(filter);
+
+ nfct_callback_register(ct_event, NFCT_T_ALL, collector_cb, NULL);
flow_list_init(&flow_list);
+ ct_dump = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_UPDATE);
+ if (!ct_dump)
+ panic("Cannot create a nfct handle: %s\n", strerror(errno));
+
+ nfct_callback_register(ct_dump, NFCT_T_ALL, dump_cb, NULL);
+
+ poll_fd[0].fd = nfct_fd(ct_event);
+ poll_fd[0].events = POLLIN;
+
+ if (fcntl(nfct_fd(ct_event), F_SETFL, O_NONBLOCK) == -1)
+ panic("Cannot set non-blocking socket: fcntl(): %s\n",
+ strerror(errno));
+
+ if (fcntl(nfct_fd(ct_dump), F_SETFL, O_NONBLOCK) == -1)
+ panic("Cannot set non-blocking socket: fcntl(): %s\n",
+ strerror(errno));
+
condlock_signal(&collector_ready);
rcu_register_thread();
- while (!sigint && ret >= 0)
- ret = nfct_catch(handle);
+ while (!sigint && ret >= 0) {
+ int status;
+
+ usleep(300000);
+
+ collector_refresh_flows(ct_dump);
+
+ status = poll(poll_fd, 1, 0);
+ if (status < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
+
+ panic("Error while polling: %s\n", strerror(errno));
+ } else if (status == 0) {
+ continue;
+ }
+
+ if (poll_fd[0].revents & POLLIN)
+ nfct_catch(ct_event);
+ }
rcu_unregister_thread();
flow_list_destroy(&flow_list);
- nfct_close(handle);
+ nfct_close(ct_event);
+ nfct_close(ct_dump);
pthread_exit(NULL);
}
@@ -1157,6 +1327,10 @@ int main(int argc, char **argv)
register_signal(SIGTERM, signal_handler);
register_signal(SIGHUP, signal_handler);
+ panic_handler_add(on_panic_handler, &nfct_acct_val);
+
+ conntrack_acct_enable();
+
init_geoip(1);
condlock_init(&collector_ready);
@@ -1171,5 +1345,7 @@ int main(int argc, char **argv)
destroy_geoip();
+ restore_sysctl(&nfct_acct_val);
+
return 0;
}