diff --git a/tcp.c b/tcp.c index 76e78b1..7eef386 100644 --- a/tcp.c +++ b/tcp.c @@ -603,6 +603,9 @@ static struct msghdr tcp6_l2_mh_sock; static struct mmsghdr tcp_l2_mh_tap [TCP_TAP_FRAMES]; +/* sendmsg() to socket */ +static struct iovec tcp_tap_iov [TAP_MSGS]; + /* Bitmap, activity monitoring needed for connection via tap */ static uint8_t tcp_act[MAX_TAP_CONNS / 8] = { 0 }; @@ -1479,38 +1482,6 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) } } -/** - * tcp_send_to_sock() - Send buffer to socket, update timestamp and sequence - * @c: Execution context - * @conn: Connection pointer - * @data: Data buffer - * @len: Length at L4 - * @extra_flags: Additional flags for send(), if any - * - * Return: negative on socket error with connection reset, 0 otherwise - */ -static int tcp_send_to_sock(struct ctx *c, struct tcp_tap_conn *conn, - char *data, int len, int extra_flags) -{ - int err = send(conn->sock, data, len, - MSG_DONTWAIT | MSG_NOSIGNAL | extra_flags); - - if (err < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); - return err; - } - - err = errno; - tcp_rst(c, conn); - return -err; - } - - conn->seq_from_tap += err; - - return 0; -} - /** * tcp_sock_consume() - Consume (discard) data from buffer, update ACK sequence * @conn: Connection pointer @@ -1811,6 +1782,154 @@ out_restore_iov: return ret; } +/** + * tcp_data_from_tap() - tap data in ESTABLISHED{,SOCK_FIN}, CLOSE_WAIT states + * @c: Execution context + * @conn: Connection pointer + * @msg: Array of messages from tap + * @count: Count of messages + * @now: Current timestamp + */ +static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, + struct tap_msg *msg, int count, + struct timespec *now) +{ + struct msghdr mh = { .msg_iov = tcp_tap_iov }; + uint32_t max_ack_seq = conn->seq_ack_from_tap; + uint32_t seq_from_tap = conn->seq_from_tap; + int i, iov_i, keep = -1, ack = 0, fin = 0; + ssize_t len; + + for (i = 0, iov_i = 0; i < count; i++) { + struct tcphdr *th = (struct tcphdr *)msg[i].l4h; + uint32_t seq, seq_offset, ack_seq; + size_t len = msg[i].l4_len, off; + char *data; + + if (len < sizeof(*th)) { + tcp_rst(c, conn); + return; + } + + off = th->doff * 4; + if (off < sizeof(*th) || off > len) { + tcp_rst(c, conn); + return; + } + + if (th->rst) { + tcp_tap_destroy(c, conn); + return; + } + + len -= off; + data = (char *)th + off; + + seq = ntohl(th->seq); + ack_seq = ntohl(th->ack_seq); + + if (th->ack) { + ack = 1; + if (ack_seq - conn->seq_ack_from_tap < MAX_WINDOW && + ack_seq - max_ack_seq < MAX_WINDOW) + max_ack_seq = ack_seq; + } + + if (th->fin) + fin = 1; + + seq_offset = seq_from_tap - seq; + /* Use data from this buffer only in these two cases: + * + * , seq_from_tap , seq_from_tap + * |--------| <-- len |--------| <-- len + * '----' <-- offset ' <-- offset + * ^ seq ^ seq + * + * (offset >= 0, seq + len > seq_from_tap) + * + * discard in these two cases: + * , seq_from_tap , seq_from_tap + * |--------| <-- len |--------| <-- len + * '--------' <-- offset '-----| <- offset + * ^ seq ^ seq + * (offset >= 0, seq + len <= seq_from_tap) + * + * keep, look for another buffer, then go back, in this case: + * , seq_from_tap + * |--------| <-- len + * '===' <-- offset + * ^ seq + * (offset < 0 i.e. > MAX_WINDOW) + */ + if (seq_offset < MAX_WINDOW && seq + len <= seq_from_tap) + continue; + + if (seq_offset > MAX_WINDOW) { + if (keep != -1) + keep = i; + continue; + } + + tcp_tap_iov[iov_i].iov_base = data + seq_offset; + tcp_tap_iov[iov_i].iov_len = len - seq_offset; + seq_from_tap += tcp_tap_iov[iov_i].iov_len; + iov_i++; + + if (keep == i) { + i = keep + 1; + keep = -1; + } + } + + if (ack) { + conn->ts_ack_tap = *now; + tcp_sock_consume(conn, max_ack_seq); + } + + if (!iov_i) { + if (keep != -1) { + tcp_send_to_tap(c, conn, ACK, NULL, 0); + tcp_send_to_tap(c, conn, ACK, NULL, 0); + } + goto fin; + } + + mh.msg_iovlen = iov_i; + len = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL); + if (len < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); + return; + } + tcp_rst(c, conn); + return; + } + + if (len < (seq_from_tap - conn->seq_from_tap)) { + conn->seq_from_tap += len; + tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); + return; + } + + conn->seq_from_tap += len; + tcp_send_to_tap(c, conn, 0, NULL, 0); + +fin: + if (conn->state == ESTABLISHED_SOCK_FIN && ack && + !tcp_data_from_sock(c, conn, now)) + tcp_tap_state(conn, CLOSE_WAIT); + + if (fin) { + shutdown(conn->sock, SHUT_WR); + if (conn->state == ESTABLISHED) + tcp_tap_state(conn, FIN_WAIT_1); + else + tcp_tap_state(conn, LAST_ACK); + return; + } +} + /** * tcp_tap_handler() - Handle packets from tap and state transitions * @c: Execution context @@ -1827,16 +1946,11 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, { union epoll_ref ref = { .proto = IPPROTO_TCP, .tcp.v6 = ( af == AF_INET6 ) }; - - /* TODO: Implement message batching for TCP */ struct tcphdr *th = (struct tcphdr *)msg[0].l4h; - size_t len = msg[0].l4_len; - uint32_t ack_seq; - + size_t len = msg[0].l4_len, off; struct tcp_tap_conn *conn; struct epoll_event ev; - size_t off, skip = 0; - int ws, i; + int ws; if (len < sizeof(*th)) return 1; @@ -1852,41 +1966,15 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, return 1; } - /* TODO: Partial ACK coalescing, merge with message coalescing */ - ack_seq = ntohl(th->ack_seq); - for (i = 0; conn->state == ESTABLISHED && i < count; i++) { - struct tcphdr *__th = (struct tcphdr *)msg[i].l4h; - size_t __len = msg[i].l4_len; - - if (__len < sizeof(*th)) - break; - - off = __th->doff * 4; - if (off < sizeof(*th) || off > __len) - break; - - if (!th->ack) - continue; - - if (ntohl(th->ack_seq) - ack_seq < MAX_WINDOW) - ack_seq = ntohl(th->ack_seq); - } - if (th->rst) { tcp_tap_destroy(c, conn); return 1; } - if (count == 1) - tcp_clamp_window(conn, th, len, th->syn && th->ack); + tcp_clamp_window(conn, th, len, th->syn && th->ack); conn->ts_tap = *now; - if (ntohl(th->seq) < conn->seq_from_tap && - conn->seq_from_tap - ntohl(th->seq) < MAX_WINDOW) { - skip = conn->seq_from_tap - ntohl(th->seq); - } - switch (conn->state) { case SOCK_SYN_SENT: if (!th->syn || !th->ack) { @@ -1951,62 +2039,9 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, break; case ESTABLISHED: case ESTABLISHED_SOCK_FIN: - conn->ts_ack_tap = *now; - - if (ntohl(th->ack_seq) > conn->seq_to_tap && - (conn->seq_to_tap - ntohl(th->ack_seq)) > MAX_WINDOW) { - return count; - } - - if (th->ack) { - if (count == 1) - tcp_sock_consume(conn, ack_seq); - - if (conn->state == ESTABLISHED_SOCK_FIN) { - if (!tcp_data_from_sock(c, conn, now)) - tcp_tap_state(conn, CLOSE_WAIT); - } - } - - if (ntohl(th->seq) > conn->seq_from_tap) { - tcp_send_to_tap(c, conn, ACK, NULL, 0); - tcp_send_to_tap(c, conn, ACK, NULL, 0); - return count; - } - - if (skip < len - off && - tcp_send_to_sock(c, conn, - msg[0].l4h + off + skip, len - off - skip, - (count > 1) ? MSG_MORE : 0)) - return 1; - - if (count == 1) - tcp_send_to_tap(c, conn, ACK, NULL, 0); - - if (th->fin) { - shutdown(conn->sock, SHUT_WR); - if (conn->state == ESTABLISHED) - tcp_tap_state(conn, FIN_WAIT_1); - else - tcp_tap_state(conn, LAST_ACK); - } - - break; case CLOSE_WAIT: - tcp_sock_consume(conn, ntohl(th->ack_seq)); - - if (skip < (len - off) && - tcp_send_to_sock(c, conn, - msg[0].l4h + off + skip, len - off - skip, - th->psh ? 0 : MSG_MORE)) - break; - - if (th->fin) { - shutdown(conn->sock, SHUT_WR); - tcp_tap_state(conn, LAST_ACK); - } - - break; + tcp_data_from_tap(c, conn, msg, count, now); + return count; case FIN_WAIT_1_SOCK_FIN: if (th->ack) tcp_tap_destroy(c, conn);