udp: Consolidate datagram batching

When we receive datagrams on a socket, we need to split them into batches
depending on how they need to be forwarded (either via a specific splice
socket, or via tap).  The logic to do this, is somewhat awkwardly split
between udp_buf_sock_handler() itself, udp_splice_send() and
udp_tap_send().

Move all the batching logic into udp_buf_sock_handler(), leaving
udp_splice_send() to just send the prepared batch.  udp_tap_send() reduces
to just a call to tap_send_frames() so open-code that call in
udp_buf_sock_handler().

This will allow separating the batching logic from the rest of the datagram
forwarding logic, which we'll need for upcoming flow table support.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
Signed-off-by: Stefano Brivio <sbrivio@redhat.com>
This commit is contained in:
David Gibson 2024-07-05 20:44:07 +10:00 committed by Stefano Brivio
parent 69e5393c37
commit be0214cca6

130
udp.c
View file

@ -501,42 +501,29 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
} }
/** /**
* udp_splice_send() - Send datagrams from socket to socket * udp_splice_send() - Send a batch of datagrams from socket to socket
* @c: Execution context * @c: Execution context
* @start: Index of first datagram in udp[46]_l2_buf * @start: Index of batch's first datagram in udp[46]_l2_buf
* @n: Total number of datagrams in udp[46]_l2_buf pool * @n: Number of datagrams in batch
* @dst: Datagrams will be sent to this port (on destination side) * @src: Source port for datagram (target side)
* @dst: Destination port for datagrams (target side)
* @ref: epoll reference for origin socket * @ref: epoll reference for origin socket
* @now: Timestamp * @now: Timestamp
*
* This consumes as many datagrams as are sendable via a single socket. It
* requires that udp_meta[@start].splicesrc is initialised, and will initialise
* udp_meta[].splicesrc for each datagram it consumes *and one more* (if
* present).
*
* Return: Number of datagrams forwarded
*/ */
static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n, static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
in_port_t dst, union epoll_ref ref, in_port_t src, in_port_t dst,
union epoll_ref ref,
const struct timespec *now) const struct timespec *now)
{ {
in_port_t src = udp_meta[start].splicesrc;
struct mmsghdr *mmh_recv;
unsigned int i = start;
int s; int s;
ASSERT(udp_meta[start].splicesrc >= 0);
ASSERT(ref.type == EPOLL_TYPE_UDP);
if (ref.udp.v6) { if (ref.udp.v6) {
mmh_recv = udp6_mh_recv;
udp_splice_to.sa6 = (struct sockaddr_in6) { udp_splice_to.sa6 = (struct sockaddr_in6) {
.sin6_family = AF_INET6, .sin6_family = AF_INET6,
.sin6_addr = in6addr_loopback, .sin6_addr = in6addr_loopback,
.sin6_port = htons(dst), .sin6_port = htons(dst),
}; };
} else { } else {
mmh_recv = udp4_mh_recv;
udp_splice_to.sa4 = (struct sockaddr_in) { udp_splice_to.sa4 = (struct sockaddr_in) {
.sin_family = AF_INET, .sin_family = AF_INET,
.sin_addr = in4addr_loopback, .sin_addr = in4addr_loopback,
@ -544,15 +531,6 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
}; };
} }
do {
udp_splice_prepare(mmh_recv, i);
if (++i >= n)
break;
udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]);
} while (udp_meta[i].splicesrc == src);
if (ref.udp.pif == PIF_SPLICE) { if (ref.udp.pif == PIF_SPLICE) {
src += c->udp.fwd_in.rdelta[src]; src += c->udp.fwd_in.rdelta[src];
s = udp_splice_init[ref.udp.v6][src].sock; s = udp_splice_init[ref.udp.v6][src].sock;
@ -560,7 +538,7 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
s = udp_splice_new(c, ref.udp.v6, src, false); s = udp_splice_new(c, ref.udp.v6, src, false);
if (s < 0) if (s < 0)
goto out; return;
udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec; udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec;
udp_splice_init[ref.udp.v6][src].ts = now->tv_sec; udp_splice_init[ref.udp.v6][src].ts = now->tv_sec;
@ -577,15 +555,13 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
s = arg.s; s = arg.s;
} }
if (s < 0) if (s < 0)
goto out; return;
udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec; udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec;
udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec; udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec;
} }
sendmmsg(s, udp_mh_splice + start, i - start, MSG_NOSIGNAL); sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
out:
return i - start;
} }
/** /**
@ -725,7 +701,7 @@ static size_t udp_update_hdr6(const struct ctx *c,
* @v6: Prepare for IPv6? * @v6: Prepare for IPv6?
* @now: Current timestamp * @now: Current timestamp
*/ */
static void udp_tap_prepare(const struct ctx *c, struct mmsghdr *mmh, static void udp_tap_prepare(const struct ctx *c, const struct mmsghdr *mmh,
unsigned idx, in_port_t dstport, bool v6, unsigned idx, in_port_t dstport, bool v6,
const struct timespec *now) const struct timespec *now)
{ {
@ -752,49 +728,6 @@ static void udp_tap_prepare(const struct ctx *c, struct mmsghdr *mmh,
(*tap_iov)[UDP_IOV_PAYLOAD].iov_len = l4len; (*tap_iov)[UDP_IOV_PAYLOAD].iov_len = l4len;
} }
/**
* udp_tap_send() - Prepare UDP datagrams and send to tap interface
* @c: Execution context
* @start: Index of first datagram in udp[46]_l2_buf pool
* @n: Total number of datagrams in udp[46]_l2_buf pool
* @dstport: Destination port number on destination side
* @ref: epoll reference for origin socket
* @now: Current timestamp
*
* This consumes as many frames as are sendable via tap. It requires that
* udp_meta[@start].splicesrc is initialised, and will initialise
* udp_meta[].splicesrc for each frame it consumes *and one more* (if present).
*
* Return: Number of frames sent via tap
*/
static unsigned udp_tap_send(const struct ctx *c, size_t start, size_t n,
in_port_t dstport, union epoll_ref ref,
const struct timespec *now)
{
struct mmsghdr *mmh_recv;
size_t i = start;
ASSERT(udp_meta[start].splicesrc == -1);
ASSERT(ref.type == EPOLL_TYPE_UDP);
if (ref.udp.v6)
mmh_recv = udp6_mh_recv;
else
mmh_recv = udp4_mh_recv;
do {
udp_tap_prepare(c, mmh_recv, i, dstport, ref.udp.v6, now);
if (++i >= n)
break;
udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]);
} while (udp_meta[i].splicesrc == -1);
tap_send_frames(c, &udp_l2_iov[start][0], UDP_NUM_IOVS, i - start);
return i - start;
}
/** /**
* udp_sock_recv() - Receive datagrams from a socket * udp_sock_recv() - Receive datagrams from a socket
* @c: Execution context * @c: Execution context
@ -842,7 +775,7 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
{ {
struct mmsghdr *mmh_recv = ref.udp.v6 ? udp6_mh_recv : udp4_mh_recv; struct mmsghdr *mmh_recv = ref.udp.v6 ? udp6_mh_recv : udp4_mh_recv;
in_port_t dstport = ref.udp.port; in_port_t dstport = ref.udp.port;
int n, m, i; int n, i;
if ((n = udp_sock_recv(c, ref.fd, events, mmh_recv)) <= 0) if ((n = udp_sock_recv(c, ref.fd, events, mmh_recv)) <= 0)
return; return;
@ -852,19 +785,38 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
else if (ref.udp.pif == PIF_HOST) else if (ref.udp.pif == PIF_HOST)
dstport += c->udp.fwd_in.f.delta[dstport]; dstport += c->udp.fwd_in.f.delta[dstport];
/* We divide things into batches based on how we need to send them, /* We divide datagrams into batches based on how we need to send them,
* determined by udp_meta[i].splicesrc. To avoid either two passes * determined by udp_meta[i].splicesrc. To avoid either two passes
* through the array, or recalculating splicesrc for a single entry, we * through the array, or recalculating splicesrc for a single entry, we
* have to populate it one entry *ahead* of the loop counter (if * have to populate it one entry *ahead* of the loop counter.
* present). So we fill in entry 0 before the loop, then udp_*_send()
* populate one entry past where they consume.
*/ */
udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv); udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv);
for (i = 0; i < n; i += m) { for (i = 0; i < n; ) {
if (udp_meta[i].splicesrc >= 0) int batchsrc = udp_meta[i].splicesrc;
m = udp_splice_send(c, i, n, dstport, ref, now); int batchstart = i;
else
m = udp_tap_send(c, i, n, dstport, ref, now); do {
if (batchsrc >= 0) {
udp_splice_prepare(mmh_recv, i);
} else {
udp_tap_prepare(c, mmh_recv, i, dstport,
ref.udp.v6, now);
}
if (++i >= n)
break;
udp_meta[i].splicesrc = udp_mmh_splice_port(ref,
&mmh_recv[i]);
} while (udp_meta[i].splicesrc == batchsrc);
if (batchsrc >= 0) {
udp_splice_send(c, batchstart, i - batchstart,
batchsrc, dstport, ref, now);
} else {
tap_send_frames(c, &udp_l2_iov[batchstart][0],
UDP_NUM_IOVS, i - batchstart);
}
} }
} }