summaryrefslogtreecommitdiff
path: root/flowtop.c
diff options
context:
space:
mode:
Diffstat (limited to 'flowtop.c')
-rw-r--r--flowtop.c349
1 files changed, 302 insertions, 47 deletions
diff --git a/flowtop.c b/flowtop.c
index 78ac253..2491de7 100644
--- a/flowtop.c
+++ b/flowtop.c
@@ -52,7 +52,25 @@
#define USEC_PER_SEC 1000000L
#endif
+struct proc_entry {
+ struct cds_list_head entry;
+ struct cds_list_head flows;
+ struct rcu_head rcu;
+
+ struct timeval last_update;
+ unsigned int pid;
+ char name[256];
+ uint64_t pkts_src, bytes_src;
+ uint64_t pkts_dst, bytes_dst;
+ double rate_bytes_src;
+ double rate_bytes_dst;
+ double rate_pkts_src;
+ double rate_pkts_dst;
+ int flows_count;
+};
+
struct flow_entry {
+ struct cds_list_head proc_head;
struct cds_list_head entry;
struct rcu_head rcu;
@@ -69,9 +87,8 @@ struct flow_entry {
char country_code_src[4], country_code_dst[4];
char city_src[128], city_dst[128];
char rev_dns_src[256], rev_dns_dst[256];
- char procname[256];
+ struct proc_entry *proc;
int inode;
- unsigned int procnum;
bool is_visible;
struct nf_conntrack *ct;
struct timeval last_update;
@@ -85,6 +102,10 @@ struct flow_list {
struct cds_list_head head;
};
+struct proc_list {
+ struct cds_list_head head;
+};
+
enum flow_direction {
FLOW_DIR_SRC,
FLOW_DIR_DST,
@@ -129,6 +150,7 @@ static volatile bool do_reload_flows;
static volatile bool is_flow_collecting;
static volatile sig_atomic_t sigint = 0;
static int what = INCLUDE_IPV4 | INCLUDE_IPV6 | INCLUDE_TCP;
+static struct proc_list proc_list;
static struct flow_list flow_list;
static struct sysctl_params_ctx sysctl = { -1, -1 };
@@ -156,10 +178,22 @@ enum tbl_flow_col {
TBL_FLOW_RATE,
};
+enum tbl_proc_col {
+ TBL_PROC_NAME,
+ TBL_PROC_PID,
+ TBL_PROC_FLOWS,
+ TBL_PROC_BYTES_SRC,
+ TBL_PROC_RATE_SRC,
+ TBL_PROC_BYTES_DST,
+ TBL_PROC_RATE_DST,
+};
+
static struct ui_table flows_tbl;
+static struct ui_table procs_tbl;
enum tab_entry {
TAB_FLOWS,
+ TAB_PROCS,
};
static const char *short_options = "vhTUsDIS46ut:nGb";
@@ -432,6 +466,11 @@ static int flow_list_del_entry(struct flow_list *fl, const struct nf_conntrack *
n = flow_list_find_id(fl, nfct_get_attr_u32(ct, ATTR_ID));
if (n) {
+ if (n->proc) {
+ cds_list_del(&n->proc_head);
+ n->proc->flows_count--;
+ }
+
cds_list_del_rcu(&n->entry);
call_rcu(&n->rcu, flow_entry_xfree_rcu);
}
@@ -449,22 +488,72 @@ static void flow_list_destroy(struct flow_list *fl)
}
}
+static void proc_list_init(struct proc_list *proc_list)
+{
+ CDS_INIT_LIST_HEAD(&proc_list->head);
+}
+
+static struct proc_entry *proc_list_new_entry(unsigned int pid)
+{
+ struct proc_entry *proc;
+
+ cds_list_for_each_entry(proc, &proc_list.head, entry) {
+ if (proc->pid && proc->pid == pid)
+ return proc;
+ }
+
+ proc = xzmalloc(sizeof(*proc));
+
+ bug_on(gettimeofday(&proc->last_update, NULL));
+ CDS_INIT_LIST_HEAD(&proc->flows);
+ proc->pid = pid;
+
+ cds_list_add_tail(&proc->entry, &proc_list.head);
+
+ return proc;
+}
+
+static void proc_entry_xfree_rcu(struct rcu_head *head)
+{
+ struct proc_entry *p = container_of(head, struct proc_entry, rcu);
+
+ xfree(p);
+}
+
+static void proc_list_destroy(struct proc_list *pl)
+{
+ struct proc_entry *p, *tmp;
+
+ cds_list_for_each_entry_safe(p, tmp, &pl->head, entry) {
+ cds_list_del_rcu(&p->entry);
+ call_rcu(&p->rcu, proc_entry_xfree_rcu);
+ }
+}
+
static void flow_entry_find_process(struct flow_entry *n)
{
+ struct proc_entry *p;
char cmdline[512];
pid_t pid;
int ret;
ret = proc_find_by_inode(n->inode, cmdline, sizeof(cmdline), &pid);
- if (ret <= 0) {
- n->procname[0] = '\0';
+ if (ret <= 0)
return;
- }
- if (snprintf(n->procname, sizeof(n->procname), "%s", basename(cmdline)) < 0)
- n->procname[0] = '\0';
+ p = proc_list_new_entry(pid);
+
+ if (snprintf(p->name, sizeof(p->name), "%s", basename(cmdline)) < 0)
+ p->name[0] = '\0';
- n->procnum = pid;
+ p->pkts_src += n->pkts_src;
+ p->pkts_dst += n->pkts_dst;
+ p->bytes_src += n->bytes_src;
+ p->bytes_dst += n->bytes_dst;
+ p->flows_count++;
+
+ cds_list_add(&n->proc_head, &p->flows);
+ n->proc = p;
}
static int get_port_inode(uint16_t port, int proto, bool is_ip6)
@@ -513,6 +602,19 @@ static int get_port_inode(uint16_t port, int proto, bool is_ip6)
static void flow_entry_from_ct(struct flow_entry *n, const struct nf_conntrack *ct)
{
+ uint64_t bytes_src = nfct_get_attr_u64(ct, ATTR_ORIG_COUNTER_BYTES);
+ uint64_t bytes_dst = nfct_get_attr_u64(ct, ATTR_REPL_COUNTER_BYTES);
+ uint64_t pkts_src = nfct_get_attr_u64(ct, ATTR_ORIG_COUNTER_PACKETS);
+ uint64_t pkts_dst = nfct_get_attr_u64(ct, ATTR_REPL_COUNTER_PACKETS);
+
+ /* Update stats diff to the related process entry */
+ if (n->proc) {
+ n->proc->pkts_src += pkts_src - n->pkts_src;
+ n->proc->pkts_dst += pkts_dst - n->pkts_dst;
+ n->proc->bytes_src += bytes_src - n->bytes_src;
+ n->proc->bytes_dst += bytes_dst - n->bytes_dst;
+ }
+
CP_NFCT(l3_proto, ATTR_ORIG_L3PROTO, 8);
CP_NFCT(l4_proto, ATTR_ORIG_L4PROTO, 8);
@@ -916,10 +1018,11 @@ static void draw_flow_entry(const struct flow_entry *n)
ui_table_row_add(&flows_tbl);
/* Application */
- ui_table_row_col_set(&flows_tbl, TBL_FLOW_PROCESS, n->procname);
+ ui_table_row_col_set(&flows_tbl, TBL_FLOW_PROCESS,
+ n->proc ? n->proc->name : "");
/* PID */
- slprintf(tmp, sizeof(tmp), "%.d", n->procnum);
+ slprintf(tmp, sizeof(tmp), "%.d", n->proc ? n->proc->pid : 0);
ui_table_row_col_set(&flows_tbl, TBL_FLOW_PID, tmp);
/* L4 protocol */
@@ -1010,6 +1113,37 @@ static inline bool presenter_flow_wrong_state(struct flow_entry *n)
return true;
}
+static void draw_filter_status(char *title, int count, int skip_lines)
+{
+ mvwprintw(screen, 1, 0, "%*s", COLS - 1, " ");
+ mvwprintw(screen, 1, 2, "%s(%u) for ", title, count);
+
+ if (what & INCLUDE_IPV4)
+ printw("IPv4,");
+ if (what & INCLUDE_IPV6)
+ printw("IPv6,");
+ if (what & INCLUDE_TCP)
+ printw("TCP,");
+ if (what & INCLUDE_UDP)
+ printw("UDP,");
+ if (what & INCLUDE_SCTP)
+ printw("SCTP,");
+ if (what & INCLUDE_DCCP)
+ printw("DCCP,");
+ if (what & INCLUDE_ICMP && what & INCLUDE_IPV4)
+ printw("ICMP,");
+ if (what & INCLUDE_ICMP && what & INCLUDE_IPV6)
+ printw("ICMP6,");
+ if (show_active_only)
+ printw("Active,");
+
+ printw(" [+%d]", skip_lines);
+
+ if (is_flow_collecting)
+ printw(" [Collecting flows ...]");
+
+}
+
static void draw_flows(WINDOW *screen, struct flow_list *fl,
int skip_lines)
{
@@ -1051,29 +1185,73 @@ static void draw_flows(WINDOW *screen, struct flow_list *fl,
mvwprintw(screen, 1, 0, "%*s", COLS - 1, " ");
mvwprintw(screen, 1, 2, "Kernel netfilter flows(%u) for ", flows);
- if (what & INCLUDE_IPV4)
- printw("IPv4,");
- if (what & INCLUDE_IPV6)
- printw("IPv6,");
- if (what & INCLUDE_TCP)
- printw("TCP,");
- if (what & INCLUDE_UDP)
- printw("UDP,");
- if (what & INCLUDE_SCTP)
- printw("SCTP,");
- if (what & INCLUDE_DCCP)
- printw("DCCP,");
- if (what & INCLUDE_ICMP && what & INCLUDE_IPV4)
- printw("ICMP,");
- if (what & INCLUDE_ICMP && what & INCLUDE_IPV6)
- printw("ICMP6,");
- if (show_active_only)
- printw("Active,");
+ draw_filter_status("Kernel netfilter flows", flows, skip_lines);
+}
- printw(" [+%d]", skip_lines);
+static void draw_proc_entry(struct proc_entry *p)
+{
+ struct ui_table *tbl = &procs_tbl;;
+ char tmp[128];
- if (is_flow_collecting)
- printw(" [Collecting flows ...]");
+ ui_table_row_add(tbl);
+
+ /* Application */
+ ui_table_row_col_set(tbl, TBL_PROC_NAME, p->name);
+
+ /* PID */
+ slprintf(tmp, sizeof(tmp), "%.d", p->pid);
+ ui_table_row_col_set(tbl, TBL_PROC_PID, tmp);
+
+ /* Flows */
+ slprintf(tmp, sizeof(tmp), "%.d", p->flows_count);
+ ui_table_row_col_set(tbl, TBL_PROC_FLOWS, tmp);
+
+ /* Bytes Src */
+ bandw2str(p->bytes_src, tmp, sizeof(tmp) - 1);
+ ui_table_row_col_set(tbl, TBL_PROC_BYTES_SRC, tmp);
+
+ /* Rate Src */
+ rate2str(p->rate_bytes_src, tmp, sizeof(tmp) - 1);
+ ui_table_row_col_set(tbl, TBL_PROC_RATE_SRC, tmp);
+
+ /* Bytes Dest */
+ bandw2str(p->bytes_dst, tmp, sizeof(tmp) - 1);
+ ui_table_row_col_set(tbl, TBL_PROC_BYTES_DST, tmp);
+
+ /* Rate Dest */
+ rate2str(p->rate_bytes_dst, tmp, sizeof(tmp) - 1);
+ ui_table_row_col_set(tbl, TBL_PROC_RATE_DST, tmp);
+
+ ui_table_row_show(tbl);
+}
+
+static void draw_procs(WINDOW *screen, struct flow_list *fl,
+ int skip_lines)
+{
+ struct proc_entry *proc, *tmp;
+ unsigned int line = 4;
+ int skip = skip_lines;
+ int procs = 0;
+
+ rcu_read_lock();
+
+ ui_table_clear(&procs_tbl);
+ ui_table_header_print(&procs_tbl);
+
+ cds_list_for_each_entry_safe(proc, tmp, &proc_list.head, entry) {
+ if (line + 1 >= rows)
+ continue;
+ if (--skip >= 0)
+ continue;
+
+ draw_proc_entry(proc);
+ procs++;
+ line++;
+ }
+
+ rcu_read_unlock();
+
+ draw_filter_status("Processes", procs, skip_lines);
}
static void draw_help(void)
@@ -1104,26 +1282,27 @@ static void draw_help(void)
mvaddnstr(row + 2, col + 2, "Navigation", -1);
attroff(A_BOLD | A_UNDERLINE);
- mvaddnstr(row + 4, col + 3, "Up, u, k Move up", -1);
- mvaddnstr(row + 5, col + 3, "Down, d, j Move down", -1);
- mvaddnstr(row + 6, col + 3, "Left,l Scroll left", -1);
- mvaddnstr(row + 7, col + 3, "Right,h Scroll right", -1);
- mvaddnstr(row + 8, col + 3, "? Toggle help window", -1);
- mvaddnstr(row + 9, col + 3, "q, Ctrl+C Quit", -1);
+ mvaddnstr(row + 4, col + 3, "TAB Go to next tab panel", -1);
+ mvaddnstr(row + 5, col + 3, "Up, u, k Move up", -1);
+ mvaddnstr(row + 6, col + 3, "Down, d, j Move down", -1);
+ mvaddnstr(row + 7, col + 3, "Left,l Scroll left", -1);
+ mvaddnstr(row + 8, col + 3, "Right,h Scroll right", -1);
+ mvaddnstr(row + 9, col + 3, "? Toggle help window", -1);
+ mvaddnstr(row + 10, col + 3, "q, Ctrl+C Quit", -1);
attron(A_BOLD | A_UNDERLINE);
- mvaddnstr(row + 11, col + 2, "Display Settings", -1);
+ mvaddnstr(row + 12, col + 2, "Display Settings", -1);
attroff(A_BOLD | A_UNDERLINE);
- mvaddnstr(row + 13, col + 3, "b Toggle rate units (bits/bytes)", -1);
- mvaddnstr(row + 14, col + 3, "a Toggle display of active flows (rate > 0) only", -1);
- mvaddnstr(row + 15, col + 3, "s Toggle show source peer info", -1);
+ mvaddnstr(row + 14, col + 3, "b Toggle rate units (bits/bytes)", -1);
+ mvaddnstr(row + 15, col + 3, "a Toggle display of active flows (rate > 0) only", -1);
+ mvaddnstr(row + 16, col + 3, "s Toggle show source peer info", -1);
- mvaddnstr(row + 17, col + 3, "T Toggle display TCP flows", -1);
- mvaddnstr(row + 18, col + 3, "U Toggle display UDP flows", -1);
- mvaddnstr(row + 19, col + 3, "D Toggle display DCCP flows", -1);
- mvaddnstr(row + 20, col + 3, "I Toggle display ICMP flows", -1);
- mvaddnstr(row + 21, col + 3, "S Toggle display SCTP flows", -1);
+ mvaddnstr(row + 18, col + 3, "T Toggle display TCP flows", -1);
+ mvaddnstr(row + 19, col + 3, "U Toggle display UDP flows", -1);
+ mvaddnstr(row + 20, col + 3, "D Toggle display DCCP flows", -1);
+ mvaddnstr(row + 21, col + 3, "I Toggle display ICMP flows", -1);
+ mvaddnstr(row + 22, col + 3, "S Toggle display SCTP flows", -1);
}
static void draw_header(WINDOW *screen)
@@ -1203,9 +1382,46 @@ static void flows_table_init(struct ui_table *tbl)
ui_table_header_color_set(&flows_tbl, COLOR(BLACK, GREEN));
}
+static void procs_table_init(struct ui_table *tbl)
+{
+ ui_table_init(tbl);
+
+ ui_table_pos_set(tbl, 3, 0);
+ ui_table_height_set(tbl, LINES - 3);
+
+ ui_table_col_add(tbl, TBL_PROC_NAME, "NAME", 13);
+ ui_table_col_add(tbl, TBL_PROC_PID, "PID", 7);
+ ui_table_col_add(tbl, TBL_PROC_FLOWS, "FLOWS", 7);
+ ui_table_col_add(tbl, TBL_PROC_BYTES_SRC, "BYTES_SRC", 10);
+ ui_table_col_add(tbl, TBL_PROC_BYTES_DST, "BYTES_DST", 10);
+ ui_table_col_add(tbl, TBL_PROC_RATE_SRC, "RATE_SRC", 14);
+ ui_table_col_add(tbl, TBL_PROC_RATE_DST, "RATE_DST", 14);
+
+ ui_table_col_align_set(tbl, TBL_PROC_BYTES_SRC, UI_ALIGN_RIGHT);
+ ui_table_col_align_set(tbl, TBL_PROC_RATE_SRC, UI_ALIGN_RIGHT);
+ ui_table_col_align_set(tbl, TBL_PROC_BYTES_DST, UI_ALIGN_RIGHT);
+ ui_table_col_align_set(tbl, TBL_PROC_RATE_DST, UI_ALIGN_RIGHT);
+
+ ui_table_col_color_set(tbl, TBL_PROC_NAME, COLOR(YELLOW, BLACK));
+ ui_table_col_color_set(tbl, TBL_PROC_PID, A_BOLD);
+ ui_table_col_color_set(tbl, TBL_PROC_FLOWS, COLOR(YELLOW, BLACK));
+ ui_table_col_color_set(tbl, TBL_PROC_BYTES_SRC, COLOR(RED, BLACK));
+ ui_table_col_color_set(tbl, TBL_PROC_RATE_SRC, COLOR(RED, BLACK));
+ ui_table_col_color_set(tbl, TBL_PROC_BYTES_DST, COLOR(BLUE, BLACK));
+ ui_table_col_color_set(tbl, TBL_PROC_RATE_DST, COLOR(BLUE, BLACK));
+
+ ui_table_header_color_set(tbl, COLOR(BLACK, GREEN));
+}
+
static void tab_main_on_open(struct ui_tab *tab, enum ui_tab_event_t evt, uint32_t id)
{
- draw_flows(screen, &flow_list, skip_lines);
+ if (evt != UI_TAB_EVT_OPEN)
+ return;
+
+ if (id == TAB_FLOWS)
+ draw_flows(screen, &flow_list, skip_lines);
+ else if (id == TAB_PROCS)
+ draw_procs(screen, &flow_list, skip_lines);
}
static void presenter(void)
@@ -1228,12 +1444,14 @@ static void presenter(void)
INIT_COLOR(BLACK, GREEN);
flows_table_init(&flows_tbl);
+ procs_table_init(&procs_tbl);
tab_main = ui_tab_create();
ui_tab_event_cb_set(tab_main, tab_main_on_open);
ui_tab_pos_set(tab_main, 2, 0);
ui_tab_active_color_set(tab_main, COLOR(BLACK, GREEN));
ui_tab_entry_add(tab_main, TAB_FLOWS, "Flows");
+ ui_tab_entry_add(tab_main, TAB_PROCS, "Process");
rcu_register_thread();
while (!sigint) {
@@ -1314,6 +1532,7 @@ static void presenter(void)
rcu_unregister_thread();
ui_table_uninit(&flows_tbl);
+ ui_table_uninit(&procs_tbl);
ui_tab_destroy(tab_main);
screen_end();
@@ -1417,6 +1636,39 @@ static int flow_event_cb(enum nf_conntrack_msg_type type,
}
}
+static void collector_refresh_procs(void)
+{
+ struct proc_entry *p, *tmp;
+
+ cds_list_for_each_entry_safe(p, tmp, &proc_list.head, entry) {
+ double sec = (double)time_after_us(&p->last_update) / USEC_PER_SEC;
+ struct flow_entry *n;
+
+ if (sec < 1)
+ continue;
+
+ bug_on(gettimeofday(&p->last_update, NULL));
+
+ if (!p->flows_count && !proc_exists(p->pid)) {
+ cds_list_del_rcu(&p->entry);
+ call_rcu(&p->rcu, proc_entry_xfree_rcu);
+ continue;
+ }
+
+ p->rate_bytes_src = 0;
+ p->rate_bytes_dst = 0;
+ p->rate_pkts_src = 0;
+ p->rate_pkts_dst = 0;
+
+ cds_list_for_each_entry_rcu(n, &p->flows, proc_head) {
+ p->rate_bytes_src += n->rate_bytes_src;
+ p->rate_bytes_dst += n->rate_bytes_dst;
+ p->rate_pkts_src += n->rate_pkts_src;
+ p->rate_pkts_dst += n->rate_pkts_dst;
+ }
+ }
+}
+
static void collector_refresh_flows(struct nfct_handle *handle)
{
struct flow_entry *n;
@@ -1563,6 +1815,7 @@ static void *collector(void *null __maybe_unused)
struct nfct_handle *ct_event;
struct pollfd poll_fd[1];
+ proc_list_init(&proc_list);
flow_list_init(&flow_list);
ct_event = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
@@ -1600,6 +1853,7 @@ static void *collector(void *null __maybe_unused)
collector_dump_flows();
}
+ collector_refresh_procs();
collector_refresh_flows(ct_event);
status = poll(poll_fd, 1, 0);
@@ -1615,6 +1869,7 @@ static void *collector(void *null __maybe_unused)
}
flow_list_destroy(&flow_list);
+ proc_list_destroy(&proc_list);
rcu_unregister_thread();