tcp: Full batched processing for tap messages

Similar to UDP, but using a simple sendmsg() on iovec-style buffers
from tap instead, as we don't need to preserve message boundaries.

A quick test in PASTA mode, from namespace to init via tap:

 # ip link set dev pasta0 mtu 16384
 # iperf3 -c 192.168.1.222 -t 60
   [...]
 [ ID] Interval           Transfer     Bitrate
 [  5]   0.00-60.00  sec  80.4 GBytes  11.5 Gbits/sec                  receiver

 # iperf3 -c 2a02:6d40:3cfc:3a01:2b20:4a6a:c25a:3056 -t 60
   [...]
 [ ID] Interval           Transfer     Bitrate
 [  5]   0.00-60.01  sec  39.9 GBytes  5.71 Gbits/sec                  receiver

 # ip link set dev pasta0 mtu 65520
 # iperf3 -c 192.168.1.222 -t 60
   [...]
 [ ID] Interval           Transfer     Bitrate
 [  5]   0.00-60.01  sec  88.7 GBytes  12.7 Gbits/sec                  receiver

 # iperf3 -c 2a02:6d40:3cfc:3a01:2b20:4a6a:c25a:3056 -t 60
   [...]
 [ ID] Interval           Transfer     Bitrate
 [  5]   0.00-60.00  sec  79.5 GBytes  11.4 Gbits/sec                  receiver

Signed-off-by: Stefano Brivio <sbrivio@redhat.com>
This commit is contained in:
Stefano Brivio 2021-07-27 01:09:45 +02:00
parent fd5050ccba
commit dc169643a4

277
tcp.c
View file

@ -603,6 +603,9 @@ static struct msghdr tcp6_l2_mh_sock;
static struct mmsghdr tcp_l2_mh_tap [TCP_TAP_FRAMES];
/* sendmsg() to socket */
static struct iovec tcp_tap_iov [TAP_MSGS];
/* Bitmap, activity monitoring needed for connection via tap */
static uint8_t tcp_act[MAX_TAP_CONNS / 8] = { 0 };
@ -1479,38 +1482,6 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn)
}
}
/**
* tcp_send_to_sock() - Send buffer to socket, update timestamp and sequence
* @c: Execution context
* @conn: Connection pointer
* @data: Data buffer
* @len: Length at L4
* @extra_flags: Additional flags for send(), if any
*
* Return: negative on socket error with connection reset, 0 otherwise
*/
static int tcp_send_to_sock(struct ctx *c, struct tcp_tap_conn *conn,
char *data, int len, int extra_flags)
{
int err = send(conn->sock, data, len,
MSG_DONTWAIT | MSG_NOSIGNAL | extra_flags);
if (err < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0);
return err;
}
err = errno;
tcp_rst(c, conn);
return -err;
}
conn->seq_from_tap += err;
return 0;
}
/**
* tcp_sock_consume() - Consume (discard) data from buffer, update ACK sequence
* @conn: Connection pointer
@ -1811,6 +1782,154 @@ out_restore_iov:
return ret;
}
/**
* tcp_data_from_tap() - tap data in ESTABLISHED{,SOCK_FIN}, CLOSE_WAIT states
* @c: Execution context
* @conn: Connection pointer
* @msg: Array of messages from tap
* @count: Count of messages
* @now: Current timestamp
*/
static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
struct tap_msg *msg, int count,
struct timespec *now)
{
struct msghdr mh = { .msg_iov = tcp_tap_iov };
uint32_t max_ack_seq = conn->seq_ack_from_tap;
uint32_t seq_from_tap = conn->seq_from_tap;
int i, iov_i, keep = -1, ack = 0, fin = 0;
ssize_t len;
for (i = 0, iov_i = 0; i < count; i++) {
struct tcphdr *th = (struct tcphdr *)msg[i].l4h;
uint32_t seq, seq_offset, ack_seq;
size_t len = msg[i].l4_len, off;
char *data;
if (len < sizeof(*th)) {
tcp_rst(c, conn);
return;
}
off = th->doff * 4;
if (off < sizeof(*th) || off > len) {
tcp_rst(c, conn);
return;
}
if (th->rst) {
tcp_tap_destroy(c, conn);
return;
}
len -= off;
data = (char *)th + off;
seq = ntohl(th->seq);
ack_seq = ntohl(th->ack_seq);
if (th->ack) {
ack = 1;
if (ack_seq - conn->seq_ack_from_tap < MAX_WINDOW &&
ack_seq - max_ack_seq < MAX_WINDOW)
max_ack_seq = ack_seq;
}
if (th->fin)
fin = 1;
seq_offset = seq_from_tap - seq;
/* Use data from this buffer only in these two cases:
*
* , seq_from_tap , seq_from_tap
* |--------| <-- len |--------| <-- len
* '----' <-- offset ' <-- offset
* ^ seq ^ seq
*
* (offset >= 0, seq + len > seq_from_tap)
*
* discard in these two cases:
* , seq_from_tap , seq_from_tap
* |--------| <-- len |--------| <-- len
* '--------' <-- offset '-----| <- offset
* ^ seq ^ seq
* (offset >= 0, seq + len <= seq_from_tap)
*
* keep, look for another buffer, then go back, in this case:
* , seq_from_tap
* |--------| <-- len
* '===' <-- offset
* ^ seq
* (offset < 0 i.e. > MAX_WINDOW)
*/
if (seq_offset < MAX_WINDOW && seq + len <= seq_from_tap)
continue;
if (seq_offset > MAX_WINDOW) {
if (keep != -1)
keep = i;
continue;
}
tcp_tap_iov[iov_i].iov_base = data + seq_offset;
tcp_tap_iov[iov_i].iov_len = len - seq_offset;
seq_from_tap += tcp_tap_iov[iov_i].iov_len;
iov_i++;
if (keep == i) {
i = keep + 1;
keep = -1;
}
}
if (ack) {
conn->ts_ack_tap = *now;
tcp_sock_consume(conn, max_ack_seq);
}
if (!iov_i) {
if (keep != -1) {
tcp_send_to_tap(c, conn, ACK, NULL, 0);
tcp_send_to_tap(c, conn, ACK, NULL, 0);
}
goto fin;
}
mh.msg_iovlen = iov_i;
len = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL);
if (len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0);
return;
}
tcp_rst(c, conn);
return;
}
if (len < (seq_from_tap - conn->seq_from_tap)) {
conn->seq_from_tap += len;
tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0);
return;
}
conn->seq_from_tap += len;
tcp_send_to_tap(c, conn, 0, NULL, 0);
fin:
if (conn->state == ESTABLISHED_SOCK_FIN && ack &&
!tcp_data_from_sock(c, conn, now))
tcp_tap_state(conn, CLOSE_WAIT);
if (fin) {
shutdown(conn->sock, SHUT_WR);
if (conn->state == ESTABLISHED)
tcp_tap_state(conn, FIN_WAIT_1);
else
tcp_tap_state(conn, LAST_ACK);
return;
}
}
/**
* tcp_tap_handler() - Handle packets from tap and state transitions
* @c: Execution context
@ -1827,16 +1946,11 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
{
union epoll_ref ref = { .proto = IPPROTO_TCP,
.tcp.v6 = ( af == AF_INET6 ) };
/* TODO: Implement message batching for TCP */
struct tcphdr *th = (struct tcphdr *)msg[0].l4h;
size_t len = msg[0].l4_len;
uint32_t ack_seq;
size_t len = msg[0].l4_len, off;
struct tcp_tap_conn *conn;
struct epoll_event ev;
size_t off, skip = 0;
int ws, i;
int ws;
if (len < sizeof(*th))
return 1;
@ -1852,41 +1966,15 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
return 1;
}
/* TODO: Partial ACK coalescing, merge with message coalescing */
ack_seq = ntohl(th->ack_seq);
for (i = 0; conn->state == ESTABLISHED && i < count; i++) {
struct tcphdr *__th = (struct tcphdr *)msg[i].l4h;
size_t __len = msg[i].l4_len;
if (__len < sizeof(*th))
break;
off = __th->doff * 4;
if (off < sizeof(*th) || off > __len)
break;
if (!th->ack)
continue;
if (ntohl(th->ack_seq) - ack_seq < MAX_WINDOW)
ack_seq = ntohl(th->ack_seq);
}
if (th->rst) {
tcp_tap_destroy(c, conn);
return 1;
}
if (count == 1)
tcp_clamp_window(conn, th, len, th->syn && th->ack);
tcp_clamp_window(conn, th, len, th->syn && th->ack);
conn->ts_tap = *now;
if (ntohl(th->seq) < conn->seq_from_tap &&
conn->seq_from_tap - ntohl(th->seq) < MAX_WINDOW) {
skip = conn->seq_from_tap - ntohl(th->seq);
}
switch (conn->state) {
case SOCK_SYN_SENT:
if (!th->syn || !th->ack) {
@ -1951,62 +2039,9 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
break;
case ESTABLISHED:
case ESTABLISHED_SOCK_FIN:
conn->ts_ack_tap = *now;
if (ntohl(th->ack_seq) > conn->seq_to_tap &&
(conn->seq_to_tap - ntohl(th->ack_seq)) > MAX_WINDOW) {
return count;
}
if (th->ack) {
if (count == 1)
tcp_sock_consume(conn, ack_seq);
if (conn->state == ESTABLISHED_SOCK_FIN) {
if (!tcp_data_from_sock(c, conn, now))
tcp_tap_state(conn, CLOSE_WAIT);
}
}
if (ntohl(th->seq) > conn->seq_from_tap) {
tcp_send_to_tap(c, conn, ACK, NULL, 0);
tcp_send_to_tap(c, conn, ACK, NULL, 0);
return count;
}
if (skip < len - off &&
tcp_send_to_sock(c, conn,
msg[0].l4h + off + skip, len - off - skip,
(count > 1) ? MSG_MORE : 0))
return 1;
if (count == 1)
tcp_send_to_tap(c, conn, ACK, NULL, 0);
if (th->fin) {
shutdown(conn->sock, SHUT_WR);
if (conn->state == ESTABLISHED)
tcp_tap_state(conn, FIN_WAIT_1);
else
tcp_tap_state(conn, LAST_ACK);
}
break;
case CLOSE_WAIT:
tcp_sock_consume(conn, ntohl(th->ack_seq));
if (skip < (len - off) &&
tcp_send_to_sock(c, conn,
msg[0].l4h + off + skip, len - off - skip,
th->psh ? 0 : MSG_MORE))
break;
if (th->fin) {
shutdown(conn->sock, SHUT_WR);
tcp_tap_state(conn, LAST_ACK);
}
break;
tcp_data_from_tap(c, conn, msg, count, now);
return count;
case FIN_WAIT_1_SOCK_FIN:
if (th->ack)
tcp_tap_destroy(c, conn);