summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVadim Kochan <vadim4j@gmail.com>2015-08-01 11:04:09 +0300
committerTobias Klauser <tklauser@distanz.ch>2015-08-03 09:55:35 +0200
commite23a0ad4457fc8a0dc07c6c9c14764033a0b644a (patch)
tree117968f921e2f0a9c7f7812d6e4dce7e335c38ab
parentc2e0fe0f4fd8e539ffdda6ec5656a969a1582757 (diff)
flowtop: Get rid of flushing flows by dumping ipv4/ipv6 tables
Get rid of flushing connections which resets all counters. Use dump whole ipv4/ipv6 connection tables to fullfill the existing flows, but this needs to use hand-made flow filtering because nfct_filter does not work when we do NFCT_Q_DUMP. Signed-off-by: Vadim Kochan <vadim4j@gmail.com> Signed-off-by: Tobias Klauser <tklauser@distanz.ch>
-rw-r--r--flowtop.c152
1 files changed, 122 insertions, 30 deletions
diff --git a/flowtop.c b/flowtop.c
index ac929ec..16c8b68 100644
--- a/flowtop.c
+++ b/flowtop.c
@@ -83,6 +83,7 @@ struct flow_list {
#define INCLUDE_ICMP (1 << 5)
#define INCLUDE_SCTP (1 << 6)
+static volatile bool is_flow_collecting;
static volatile sig_atomic_t sigint = 0;
static int what = INCLUDE_IPV4 | INCLUDE_IPV6 | INCLUDE_TCP, show_src = 0;
static struct flow_list flow_list;
@@ -957,14 +958,21 @@ static void presenter_screen_update(WINDOW *screen, struct flow_list *fl,
maxy -= (2 + 1 * show_src);
}
- mvwprintw(screen, 1, 2, "Kernel netfilter flows(%u) for %s%s%s%s%s%s"
- "[+%d]", flows, what & INCLUDE_TCP ? "TCP, " : "" ,
- what & INCLUDE_UDP ? "UDP, " : "",
- what & INCLUDE_SCTP ? "SCTP, " : "",
- what & INCLUDE_DCCP ? "DCCP, " : "",
- what & INCLUDE_ICMP && what & INCLUDE_IPV4 ? "ICMP, " : "",
- what & INCLUDE_ICMP && what & INCLUDE_IPV6 ? "ICMP6, " : "",
- skip_lines);
+ if (is_flow_collecting) {
+ mvwprintw(screen, 1, 2, "Collecting flows ...");
+ } else {
+ mvwprintw(screen, 1, 2,
+ "Kernel netfilter flows(%u) for %s%s%s%s%s%s"
+ "[+%d]", flows, what & INCLUDE_TCP ? "TCP, " : "",
+ what & INCLUDE_UDP ? "UDP, " : "",
+ what & INCLUDE_SCTP ? "SCTP, " : "",
+ what & INCLUDE_DCCP ? "DCCP, " : "",
+ what & INCLUDE_ICMP && what & INCLUDE_IPV4 ?
+ "ICMP, " : "",
+ what & INCLUDE_ICMP && what & INCLUDE_IPV6 ?
+ "ICMP6, " : "",
+ skip_lines);
+ }
rcu_read_unlock();
@@ -1044,24 +1052,6 @@ static int flow_event_cb(enum nf_conntrack_msg_type type,
return NFCT_CB_CONTINUE;
}
-static inline void collector_flush(void)
-{
- struct nfct_handle *nfct = nfct_open(CONNTRACK, 0);
- uint8_t family;
-
- if (!nfct)
- panic("Cannot create a nfct to flush connections: %s\n",
- strerror(errno));
-
- family = AF_INET;
- nfct_query(nfct, NFCT_Q_FLUSH, &family);
-
- family = AF_INET6;
- nfct_query(nfct, NFCT_Q_FLUSH, &family);
-
- nfct_close(nfct);
-}
-
static void restore_sysctl(void *value)
{
int int_val = *(int *)value;
@@ -1165,12 +1155,111 @@ static void collector_create_filter(struct nfct_handle *nfct)
nfct_filter_destroy(filter);
}
+/* This hand-crafted filter looks ugly but it allows to do not
+ * flush nfct connections & filter them by user specified filter.
+ * May be it is better to replace this one by nfct_cmp. */
+static int flow_dump_cb(enum nf_conntrack_msg_type type,
+ struct nf_conntrack *ct, void *data __maybe_unused)
+{
+ struct flow_entry fl;
+ struct flow_entry *n = &fl;
+
+ if (sigint)
+ return NFCT_CB_STOP;
+
+ synchronize_rcu();
+ spinlock_lock(&flow_list.lock);
+
+ if (!(what & ~(INCLUDE_IPV4 | INCLUDE_IPV6)))
+ goto check_addr;
+
+ CP_NFCT(l4_proto, ATTR_ORIG_L4PROTO, 8);
+
+ if (what & INCLUDE_UDP) {
+ if (n->l4_proto == IPPROTO_UDP)
+ goto check_addr;
+
+ if (n->l4_proto == IPPROTO_UDPLITE)
+ goto check_addr;
+
+ }
+ if ((what & INCLUDE_TCP) && n->l4_proto == IPPROTO_TCP)
+ goto check_addr;
+
+ if ((what & INCLUDE_DCCP) && n->l4_proto == IPPROTO_DCCP)
+ goto check_addr;
+
+ if ((what & INCLUDE_SCTP) && n->l4_proto == IPPROTO_SCTP)
+ goto check_addr;
+
+ if ((what & INCLUDE_ICMP) && (what & INCLUDE_IPV4) &&
+ n->l4_proto == IPPROTO_ICMP) {
+ goto check_addr;
+ }
+
+ if ((what & INCLUDE_ICMP) && (what & INCLUDE_IPV6) &&
+ n->l4_proto == IPPROTO_ICMPV6) {
+ goto check_addr;
+ }
+
+ goto skip_flow;
+
+check_addr:
+ /* filter loopback addresses */
+ if (what & INCLUDE_IPV4) {
+ CP_NFCT(ip4_src_addr, ATTR_ORIG_IPV4_SRC, 32);
+
+ if (n->ip4_src_addr == filter_ipv4.addr)
+ goto skip_flow;
+ }
+ if (what & INCLUDE_IPV6) {
+ CP_NFCT_BUFF(ip6_src_addr, ATTR_ORIG_IPV6_SRC);
+
+ if (n->ip6_src_addr[0] == 0x0 &&
+ n->ip6_src_addr[1] == 0x0 &&
+ n->ip6_src_addr[2] == 0x0 &&
+ n->ip6_src_addr[3] == 0x1)
+ goto skip_flow;
+ }
+
+ flow_list_new_entry(&flow_list, ct);
+
+skip_flow:
+ spinlock_unlock(&flow_list.lock);
+ return NFCT_CB_CONTINUE;
+}
+
+static void collector_dump_flows(void)
+{
+ struct nfct_handle *nfct = nfct_open(CONNTRACK, 0);
+
+ if (!nfct)
+ panic("Cannot create a nfct handle: %s\n", strerror(errno));
+
+ nfct_callback_register(nfct, NFCT_T_ALL, flow_dump_cb, NULL);
+
+ is_flow_collecting = true;
+ if (what & INCLUDE_IPV4) {
+ int family = AF_INET;
+ nfct_query(nfct, NFCT_Q_DUMP, &family);
+ }
+ if (what & INCLUDE_IPV6) {
+ int family = AF_INET6;
+ nfct_query(nfct, NFCT_Q_DUMP, &family);
+ }
+ is_flow_collecting = false;
+
+ nfct_close(nfct);
+}
+
static void *collector(void *null __maybe_unused)
{
struct nfct_handle *ct_update;
struct nfct_handle *ct_event;
struct pollfd poll_fd[1];
+ flow_list_init(&flow_list);
+
ct_event = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
NF_NETLINK_CONNTRACK_UPDATE |
NF_NETLINK_CONNTRACK_DESTROY);
@@ -1180,7 +1269,6 @@ static void *collector(void *null __maybe_unused)
collector_create_filter(ct_event);
nfct_callback_register(ct_event, NFCT_T_ALL, flow_event_cb, NULL);
- flow_list_init(&flow_list);
ct_update = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_UPDATE);
if (!ct_update)
@@ -1199,10 +1287,10 @@ static void *collector(void *null __maybe_unused)
panic("Cannot set non-blocking socket: fcntl(): %s\n",
strerror(errno));
- collector_flush();
-
rcu_register_thread();
+ collector_dump_flows();
+
while (!sigint) {
int status;
@@ -1283,9 +1371,13 @@ int main(int argc, char **argv)
}
}
- if (what_cmd > 0)
+ if (what_cmd > 0) {
what = what_cmd;
+ if (!(what & (INCLUDE_IPV4 | INCLUDE_IPV6)))
+ what |= INCLUDE_IPV4 | INCLUDE_IPV6;
+ }
+
rcu_init();
register_signal(SIGINT, signal_handler);