tcp, flow: Better use flow specific logging heleprs

A number of places in the TCP code use general logging functions, instead
of the flow specific ones.  That includes a few older ones as well as many
places in the new migration code.  Thus they either don't identify which
flow the problem happened on, or identify it in a non-standard way.

Convert many of these to use the existing flow specific helpers.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
Signed-off-by: Stefano Brivio <sbrivio@redhat.com>
This commit is contained in:
David Gibson 2025-03-13 13:56:17 +11:00 committed by Stefano Brivio
commit cb5b593563
6 changed files with 149 additions and 127 deletions

16
flow.c
View file

@ -1037,8 +1037,8 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
foreach_established_tcp_flow(flow) { foreach_established_tcp_flow(flow) {
rc = tcp_flow_migrate_source(fd, &flow->tcp); rc = tcp_flow_migrate_source(fd, &flow->tcp);
if (rc) { if (rc) {
err("Can't send data, flow %u: %s", FLOW_IDX(flow), flow_err(flow, "Can't send data: %s",
strerror_(-rc)); strerror_(-rc));
if (!first) if (!first)
die("Inconsistent migration state, exiting"); die("Inconsistent migration state, exiting");
@ -1064,8 +1064,8 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
foreach_established_tcp_flow(flow) { foreach_established_tcp_flow(flow) {
rc = tcp_flow_migrate_source_ext(fd, &flow->tcp); rc = tcp_flow_migrate_source_ext(fd, &flow->tcp);
if (rc) { if (rc) {
err("Extended data for flow %u: %s", FLOW_IDX(flow), flow_err(flow, "Can't send extended data: %s",
strerror_(-rc)); strerror_(-rc));
if (rc == -EIO) if (rc == -EIO)
die("Inconsistent migration state, exiting"); die("Inconsistent migration state, exiting");
@ -1112,8 +1112,8 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
rc = tcp_flow_migrate_target(c, fd); rc = tcp_flow_migrate_target(c, fd);
if (rc) { if (rc) {
debug("Migration data failure at flow %u: %s, abort", flow_dbg(FLOW(i), "Migration data failure, abort: %s",
i, strerror_(-rc)); strerror_(-rc));
return -rc; return -rc;
} }
} }
@ -1123,8 +1123,8 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
rc = tcp_flow_migrate_target_ext(c, &flowtab[i].tcp, fd); rc = tcp_flow_migrate_target_ext(c, &flowtab[i].tcp, fd);
if (rc) { if (rc) {
debug("Migration data failure at flow %u: %s, abort", flow_dbg(FLOW(i), "Migration data failure, abort: %s",
i, strerror_(-rc)); strerror_(-rc));
return -rc; return -rc;
} }
} }

252
tcp.c
View file

@ -434,19 +434,20 @@ static struct tcp_tap_conn *conn_at_sidx(flow_sidx_t sidx)
} }
/** /**
* tcp_set_peek_offset() - Set SO_PEEK_OFF offset on a socket if supported * tcp_set_peek_offset() - Set SO_PEEK_OFF offset on connection if supported
* @s: Socket to update * @conn: Pointer to the TCP connection structure
* @offset: Offset in bytes * @offset: Offset in bytes
* *
* Return: -1 when it fails, 0 otherwise. * Return: -1 when it fails, 0 otherwise.
*/ */
int tcp_set_peek_offset(int s, int offset) int tcp_set_peek_offset(const struct tcp_tap_conn *conn, int offset)
{ {
if (!peek_offset_cap) if (!peek_offset_cap)
return 0; return 0;
if (setsockopt(s, SOL_SOCKET, SO_PEEK_OFF, &offset, sizeof(offset))) { if (setsockopt(conn->sock, SOL_SOCKET, SO_PEEK_OFF,
err("Failed to set SO_PEEK_OFF to %i in socket %i", offset, s); &offset, sizeof(offset))) {
flow_perror(conn, "Failed to set SO_PEEK_OFF to %i", offset);
return -1; return -1;
} }
return 0; return 0;
@ -1757,7 +1758,7 @@ static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
"fast re-transmit, ACK: %u, previous sequence: %u", "fast re-transmit, ACK: %u, previous sequence: %u",
max_ack_seq, conn->seq_to_tap); max_ack_seq, conn->seq_to_tap);
conn->seq_to_tap = max_ack_seq; conn->seq_to_tap = max_ack_seq;
if (tcp_set_peek_offset(conn->sock, 0)) { if (tcp_set_peek_offset(conn, 0)) {
tcp_rst(c, conn); tcp_rst(c, conn);
return -1; return -1;
} }
@ -1854,7 +1855,7 @@ static void tcp_conn_from_sock_finish(const struct ctx *c,
conn->seq_ack_to_tap = conn->seq_from_tap; conn->seq_ack_to_tap = conn->seq_from_tap;
conn_event(c, conn, ESTABLISHED); conn_event(c, conn, ESTABLISHED);
if (tcp_set_peek_offset(conn->sock, 0)) { if (tcp_set_peek_offset(conn, 0)) {
tcp_rst(c, conn); tcp_rst(c, conn);
return; return;
} }
@ -2022,7 +2023,7 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
goto reset; goto reset;
conn_event(c, conn, ESTABLISHED); conn_event(c, conn, ESTABLISHED);
if (tcp_set_peek_offset(conn->sock, 0)) if (tcp_set_peek_offset(conn, 0))
goto reset; goto reset;
if (th->fin) { if (th->fin) {
@ -2286,7 +2287,7 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
conn->seq_to_tap = conn->seq_ack_from_tap; conn->seq_to_tap = conn->seq_ack_from_tap;
if (!conn->wnd_from_tap) if (!conn->wnd_from_tap)
conn->wnd_from_tap = 1; /* Zero-window probe */ conn->wnd_from_tap = 1; /* Zero-window probe */
if (tcp_set_peek_offset(conn->sock, 0)) { if (tcp_set_peek_offset(conn, 0)) {
tcp_rst(c, conn); tcp_rst(c, conn);
} else { } else {
tcp_data_from_sock(c, conn); tcp_data_from_sock(c, conn);
@ -2810,20 +2811,21 @@ int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn)
/** /**
* tcp_flow_dump_tinfo() - Dump window scale, tcpi_state, tcpi_options * tcp_flow_dump_tinfo() - Dump window scale, tcpi_state, tcpi_options
* @c: Execution context * @conn: Pointer to the TCP connection structure
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_dump_tinfo(int s, struct tcp_tap_transfer_ext *t) static int tcp_flow_dump_tinfo(const struct tcp_tap_conn *conn,
struct tcp_tap_transfer_ext *t)
{ {
struct tcp_info tinfo; struct tcp_info tinfo;
socklen_t sl; socklen_t sl;
sl = sizeof(tinfo); sl = sizeof(tinfo);
if (getsockopt(s, SOL_TCP, TCP_INFO, &tinfo, &sl)) { if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &tinfo, &sl)) {
int rc = -errno; int rc = -errno;
err_perror("Querying TCP_INFO, socket %i", s); flow_perror(conn, "Querying TCP_INFO");
return rc; return rc;
} }
@ -2837,18 +2839,19 @@ static int tcp_flow_dump_tinfo(int s, struct tcp_tap_transfer_ext *t)
/** /**
* tcp_flow_dump_mss() - Dump MSS clamp (not current MSS) via TCP_MAXSEG * tcp_flow_dump_mss() - Dump MSS clamp (not current MSS) via TCP_MAXSEG
* @c: Execution context * @conn: Pointer to the TCP connection structure
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_dump_mss(int s, struct tcp_tap_transfer_ext *t) static int tcp_flow_dump_mss(const struct tcp_tap_conn *conn,
struct tcp_tap_transfer_ext *t)
{ {
socklen_t sl = sizeof(t->mss); socklen_t sl = sizeof(t->mss);
if (getsockopt(s, SOL_TCP, TCP_MAXSEG, &t->mss, &sl)) { if (getsockopt(conn->sock, SOL_TCP, TCP_MAXSEG, &t->mss, &sl)) {
int rc = -errno; int rc = -errno;
err_perror("Getting MSS, socket %i", s); flow_perror(conn, "Getting MSS");
return rc; return rc;
} }
@ -2857,19 +2860,20 @@ static int tcp_flow_dump_mss(int s, struct tcp_tap_transfer_ext *t)
/** /**
* tcp_flow_dump_wnd() - Dump current tcp_repair_window parameters * tcp_flow_dump_wnd() - Dump current tcp_repair_window parameters
* @c: Execution context * @conn: Pointer to the TCP connection structure
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_dump_wnd(int s, struct tcp_tap_transfer_ext *t) static int tcp_flow_dump_wnd(const struct tcp_tap_conn *conn,
struct tcp_tap_transfer_ext *t)
{ {
struct tcp_repair_window wnd; struct tcp_repair_window wnd;
socklen_t sl = sizeof(wnd); socklen_t sl = sizeof(wnd);
if (getsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, &sl)) { if (getsockopt(conn->sock, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, &sl)) {
int rc = -errno; int rc = -errno;
err_perror("Getting window repair data, socket %i", s); flow_perror(conn, "Getting window repair data");
return rc; return rc;
} }
@ -2893,12 +2897,13 @@ static int tcp_flow_dump_wnd(int s, struct tcp_tap_transfer_ext *t)
/** /**
* tcp_flow_repair_wnd() - Restore window parameters from extended data * tcp_flow_repair_wnd() - Restore window parameters from extended data
* @c: Execution context * @conn: Pointer to the TCP connection structure
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t) static int tcp_flow_repair_wnd(const struct tcp_tap_conn *conn,
const struct tcp_tap_transfer_ext *t)
{ {
struct tcp_repair_window wnd; struct tcp_repair_window wnd;
@ -2908,9 +2913,10 @@ static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t)
wnd.rcv_wnd = t->rcv_wnd; wnd.rcv_wnd = t->rcv_wnd;
wnd.rcv_wup = t->rcv_wup; wnd.rcv_wup = t->rcv_wup;
if (setsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, sizeof(wnd))) { if (setsockopt(conn->sock, IPPROTO_TCP, TCP_REPAIR_WINDOW,
&wnd, sizeof(wnd))) {
int rc = -errno; int rc = -errno;
err_perror("Setting window data, socket %i", s); flow_perror(conn, "Setting window data");
return rc; return rc;
} }
@ -2919,16 +2925,17 @@ static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t)
/** /**
* tcp_flow_select_queue() - Select queue (receive or send) for next operation * tcp_flow_select_queue() - Select queue (receive or send) for next operation
* @s: Socket * @conn: Connection to select queue for
* @queue: TCP_RECV_QUEUE or TCP_SEND_QUEUE * @queue: TCP_RECV_QUEUE or TCP_SEND_QUEUE
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_select_queue(int s, int queue) static int tcp_flow_select_queue(const struct tcp_tap_conn *conn, int queue)
{ {
if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &queue, sizeof(queue))) { if (setsockopt(conn->sock, SOL_TCP, TCP_REPAIR_QUEUE,
&queue, sizeof(queue))) {
int rc = -errno; int rc = -errno;
err_perror("Selecting TCP_SEND_QUEUE, socket %i", s); flow_perror(conn, "Selecting TCP_SEND_QUEUE");
return rc; return rc;
} }
@ -2937,26 +2944,28 @@ static int tcp_flow_select_queue(int s, int queue)
/** /**
* tcp_flow_dump_sndqueue() - Dump send queue, length of sent and not sent data * tcp_flow_dump_sndqueue() - Dump send queue, length of sent and not sent data
* @s: Socket * @conn: Connection to dump queue for
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
* *
* #syscalls:vu ioctl * #syscalls:vu ioctl
*/ */
static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t) static int tcp_flow_dump_sndqueue(const struct tcp_tap_conn *conn,
struct tcp_tap_transfer_ext *t)
{ {
int s = conn->sock;
ssize_t rc; ssize_t rc;
if (ioctl(s, SIOCOUTQ, &t->sndq) < 0) { if (ioctl(s, SIOCOUTQ, &t->sndq) < 0) {
rc = -errno; rc = -errno;
err_perror("Getting send queue size, socket %i", s); flow_perror(conn, "Getting send queue size");
return rc; return rc;
} }
if (ioctl(s, SIOCOUTQNSD, &t->notsent) < 0) { if (ioctl(s, SIOCOUTQNSD, &t->notsent) < 0) {
rc = -errno; rc = -errno;
err_perror("Getting not sent count, socket %i", s); flow_perror(conn, "Getting not sent count");
return rc; return rc;
} }
@ -2975,14 +2984,16 @@ static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t)
} }
if (t->notsent > t->sndq) { if (t->notsent > t->sndq) {
err("Invalid notsent count socket %i, send: %u, not sent: %u", flow_err(conn,
s, t->sndq, t->notsent); "Invalid notsent count socket %i, send: %u, not sent: %u",
s, t->sndq, t->notsent);
return -EINVAL; return -EINVAL;
} }
if (t->sndq > TCP_MIGRATE_SND_QUEUE_MAX) { if (t->sndq > TCP_MIGRATE_SND_QUEUE_MAX) {
err("Send queue too large to migrate socket %i: %u bytes", flow_err(conn,
s, t->sndq); "Send queue too large to migrate socket %i: %u bytes",
s, t->sndq);
return -ENOBUFS; return -ENOBUFS;
} }
@ -2993,13 +3004,13 @@ static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t)
rc = 0; rc = 0;
} else { } else {
rc = -errno; rc = -errno;
err_perror("Can't read send queue, socket %i", s); flow_perror(conn, "Can't read send queue");
return rc; return rc;
} }
} }
if ((uint32_t)rc < t->sndq) { if ((uint32_t)rc < t->sndq) {
err("Short read migrating send queue"); flow_err(conn, "Short read migrating send queue");
return -ENXIO; return -ENXIO;
} }
@ -3010,19 +3021,20 @@ static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t)
/** /**
* tcp_flow_repair_queue() - Restore contents of a given (pre-selected) queue * tcp_flow_repair_queue() - Restore contents of a given (pre-selected) queue
* @s: Socket * @conn: Connection to repair queue for
* @len: Length of data to be restored * @len: Length of data to be restored
* @buf: Buffer with content of pending data queue * @buf: Buffer with content of pending data queue
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf) static int tcp_flow_repair_queue(const struct tcp_tap_conn *conn,
size_t len, uint8_t *buf)
{ {
size_t chunk = len; size_t chunk = len;
uint8_t *p = buf; uint8_t *p = buf;
while (len > 0) { while (len > 0) {
ssize_t rc = send(s, p, MIN(len, chunk), 0); ssize_t rc = send(conn->sock, p, MIN(len, chunk), 0);
if (rc < 0) { if (rc < 0) {
if ((errno == ENOBUFS || errno == ENOMEM) && if ((errno == ENOBUFS || errno == ENOMEM) &&
@ -3032,7 +3044,7 @@ static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf)
} }
rc = -errno; rc = -errno;
err_perror("Can't write queue, socket %i", s); flow_perror(conn, "Can't write queue");
return rc; return rc;
} }
@ -3045,18 +3057,18 @@ static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf)
/** /**
* tcp_flow_dump_seq() - Dump current sequence of pre-selected queue * tcp_flow_dump_seq() - Dump current sequence of pre-selected queue
* @s: Socket * @conn: Pointer to the TCP connection structure
* @v: Sequence value, set on return * @v: Sequence value, set on return
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_dump_seq(int s, uint32_t *v) static int tcp_flow_dump_seq(const struct tcp_tap_conn *conn, uint32_t *v)
{ {
socklen_t sl = sizeof(*v); socklen_t sl = sizeof(*v);
if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, &sl)) { if (getsockopt(conn->sock, SOL_TCP, TCP_QUEUE_SEQ, v, &sl)) {
int rc = -errno; int rc = -errno;
err_perror("Dumping sequence, socket %i", s); flow_perror(conn, "Dumping sequence");
return rc; return rc;
} }
@ -3065,16 +3077,17 @@ static int tcp_flow_dump_seq(int s, uint32_t *v)
/** /**
* tcp_flow_repair_seq() - Restore sequence for pre-selected queue * tcp_flow_repair_seq() - Restore sequence for pre-selected queue
* @s: Socket * @conn: Connection to repair sequences for
* @v: Sequence value to be set * @v: Sequence value to be set
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_repair_seq(int s, const uint32_t *v) static int tcp_flow_repair_seq(const struct tcp_tap_conn *conn,
const uint32_t *v)
{ {
if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, sizeof(*v))) { if (setsockopt(conn->sock, SOL_TCP, TCP_QUEUE_SEQ, v, sizeof(*v))) {
int rc = -errno; int rc = -errno;
err_perror("Setting sequence, socket %i", s); flow_perror(conn, "Setting sequence");
return rc; return rc;
} }
@ -3083,15 +3096,17 @@ static int tcp_flow_repair_seq(int s, const uint32_t *v)
/** /**
* tcp_flow_dump_rcvqueue() - Dump receive queue and its length, seal/block it * tcp_flow_dump_rcvqueue() - Dump receive queue and its length, seal/block it
* @s: Socket * @conn: Pointer to the TCP connection structure
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
* *
* #syscalls:vu ioctl * #syscalls:vu ioctl
*/ */
static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t) static int tcp_flow_dump_rcvqueue(const struct tcp_tap_conn *conn,
struct tcp_tap_transfer_ext *t)
{ {
int s = conn->sock;
ssize_t rc; ssize_t rc;
if (ioctl(s, SIOCINQ, &t->rcvq) < 0) { if (ioctl(s, SIOCINQ, &t->rcvq) < 0) {
@ -3111,8 +3126,9 @@ static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t)
t->rcvq--; t->rcvq--;
if (t->rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) { if (t->rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) {
err("Receive queue too large to migrate socket %i: %u bytes", flow_err(conn,
s, t->rcvq); "Receive queue too large to migrate socket: %u bytes",
t->rcvq);
return -ENOBUFS; return -ENOBUFS;
} }
@ -3122,13 +3138,13 @@ static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t)
rc = 0; rc = 0;
} else { } else {
rc = -errno; rc = -errno;
err_perror("Can't read receive queue for socket %i", s); flow_perror(conn, "Can't read receive queue");
return rc; return rc;
} }
} }
if ((uint32_t)rc < t->rcvq) { if ((uint32_t)rc < t->rcvq) {
err("Short read migrating receive queue"); flow_err(conn, "Short read migrating receive queue");
return -ENXIO; return -ENXIO;
} }
@ -3137,12 +3153,13 @@ static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t)
/** /**
* tcp_flow_repair_opt() - Set repair "options" (MSS, scale, SACK, timestamps) * tcp_flow_repair_opt() - Set repair "options" (MSS, scale, SACK, timestamps)
* @s: Socket * @conn: Pointer to the TCP connection structure
* @t: Extended migration data * @t: Extended migration data
* *
* Return: 0 on success, negative error code on failure * Return: 0 on success, negative error code on failure
*/ */
static int tcp_flow_repair_opt(int s, const struct tcp_tap_transfer_ext *t) static int tcp_flow_repair_opt(const struct tcp_tap_conn *conn,
const struct tcp_tap_transfer_ext *t)
{ {
const struct tcp_repair_opt opts[] = { const struct tcp_repair_opt opts[] = {
{ TCPOPT_WINDOW, t->snd_ws + (t->rcv_ws << 16) }, { TCPOPT_WINDOW, t->snd_ws + (t->rcv_ws << 16) },
@ -3156,9 +3173,9 @@ static int tcp_flow_repair_opt(int s, const struct tcp_tap_transfer_ext *t)
!!(t->tcpi_options & TCPI_OPT_SACK) + !!(t->tcpi_options & TCPI_OPT_SACK) +
!!(t->tcpi_options & TCPI_OPT_TIMESTAMPS)); !!(t->tcpi_options & TCPI_OPT_TIMESTAMPS));
if (setsockopt(s, SOL_TCP, TCP_REPAIR_OPTIONS, opts, sl)) { if (setsockopt(conn->sock, SOL_TCP, TCP_REPAIR_OPTIONS, opts, sl)) {
int rc = -errno; int rc = -errno;
err_perror("Setting repair options, socket %i", s); flow_perror(conn, "Setting repair options");
return rc; return rc;
} }
@ -3229,36 +3246,36 @@ int tcp_flow_migrate_source_ext(int fd, const struct tcp_tap_conn *conn)
/* Disable SO_PEEK_OFF, it will make accessing the queues in repair mode /* Disable SO_PEEK_OFF, it will make accessing the queues in repair mode
* weird. * weird.
*/ */
if (tcp_set_peek_offset(s, -1)) { if (tcp_set_peek_offset(conn, -1)) {
rc = -errno; rc = -errno;
goto fail; goto fail;
} }
if ((rc = tcp_flow_dump_tinfo(s, t))) if ((rc = tcp_flow_dump_tinfo(conn, t)))
goto fail; goto fail;
if ((rc = tcp_flow_dump_mss(s, t))) if ((rc = tcp_flow_dump_mss(conn, t)))
goto fail; goto fail;
if ((rc = tcp_flow_dump_wnd(s, t))) if ((rc = tcp_flow_dump_wnd(conn, t)))
goto fail; goto fail;
if ((rc = tcp_flow_select_queue(s, TCP_SEND_QUEUE))) if ((rc = tcp_flow_select_queue(conn, TCP_SEND_QUEUE)))
goto fail; goto fail;
if ((rc = tcp_flow_dump_sndqueue(s, t))) if ((rc = tcp_flow_dump_sndqueue(conn, t)))
goto fail; goto fail;
if ((rc = tcp_flow_dump_seq(s, &t->seq_snd))) if ((rc = tcp_flow_dump_seq(conn, &t->seq_snd)))
goto fail; goto fail;
if ((rc = tcp_flow_select_queue(s, TCP_RECV_QUEUE))) if ((rc = tcp_flow_select_queue(conn, TCP_RECV_QUEUE)))
goto fail; goto fail;
if ((rc = tcp_flow_dump_rcvqueue(s, t))) if ((rc = tcp_flow_dump_rcvqueue(conn, t)))
goto fail; goto fail;
if ((rc = tcp_flow_dump_seq(s, &t->seq_rcv))) if ((rc = tcp_flow_dump_seq(conn, &t->seq_rcv)))
goto fail; goto fail;
close(s); close(s);
@ -3269,14 +3286,14 @@ int tcp_flow_migrate_source_ext(int fd, const struct tcp_tap_conn *conn)
t->seq_rcv -= t->rcvq; t->seq_rcv -= t->rcvq;
t->seq_snd -= t->sndq; t->seq_snd -= t->sndq;
debug("Extended migration data, socket %i sequences send %u receive %u", flow_dbg(conn, "Extended migration data, socket %i sequences send %u receive %u",
s, t->seq_snd, t->seq_rcv); s, t->seq_snd, t->seq_rcv);
debug(" pending queues: send %u not sent %u receive %u", flow_dbg(conn, " pending queues: send %u not sent %u receive %u",
t->sndq, t->notsent, t->rcvq); t->sndq, t->notsent, t->rcvq);
debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u", flow_dbg(conn, " window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u",
t->snd_wl1, t->snd_wnd, t->max_window, t->rcv_wnd, t->rcv_wup); t->snd_wl1, t->snd_wnd, t->max_window, t->rcv_wnd, t->rcv_wup);
debug(" SO_PEEK_OFF %s offset=%"PRIu32, flow_dbg(conn, " SO_PEEK_OFF %s offset=%"PRIu32,
peek_offset_cap ? "enabled" : "disabled", peek_offset); peek_offset_cap ? "enabled" : "disabled", peek_offset);
/* Endianness fix-ups */ /* Endianness fix-ups */
t->seq_snd = htonl(t->seq_snd); t->seq_snd = htonl(t->seq_snd);
@ -3292,17 +3309,17 @@ int tcp_flow_migrate_source_ext(int fd, const struct tcp_tap_conn *conn)
t->rcv_wup = htonl(t->rcv_wup); t->rcv_wup = htonl(t->rcv_wup);
if (write_all_buf(fd, t, sizeof(*t))) { if (write_all_buf(fd, t, sizeof(*t))) {
err_perror("Failed to write extended data, socket %i", s); flow_perror(conn, "Failed to write extended data");
return -EIO; return -EIO;
} }
if (write_all_buf(fd, tcp_migrate_snd_queue, ntohl(t->sndq))) { if (write_all_buf(fd, tcp_migrate_snd_queue, ntohl(t->sndq))) {
err_perror("Failed to write send queue data, socket %i", s); flow_perror(conn, "Failed to write send queue data");
return -EIO; return -EIO;
} }
if (write_all_buf(fd, tcp_migrate_rcv_queue, ntohl(t->rcvq))) { if (write_all_buf(fd, tcp_migrate_rcv_queue, ntohl(t->rcvq))) {
err_perror("Failed to write receive queue data, socket %i", s); flow_perror(conn, "Failed to write receive queue data");
return -EIO; return -EIO;
} }
@ -3317,7 +3334,7 @@ fail:
t->tcpi_state = 0; /* Not defined: tell the target to skip this flow */ t->tcpi_state = 0; /* Not defined: tell the target to skip this flow */
if (write_all_buf(fd, t, sizeof(*t))) { if (write_all_buf(fd, t, sizeof(*t))) {
err_perror("Failed to write extended data, socket %i", s); flow_perror(conn, "Failed to write extended data");
return -EIO; return -EIO;
} }
@ -3347,19 +3364,20 @@ static int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn)
if ((conn->sock = socket(af, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, if ((conn->sock = socket(af, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
IPPROTO_TCP)) < 0) { IPPROTO_TCP)) < 0) {
rc = -errno; rc = -errno;
err_perror("Failed to create socket for migrated flow"); flow_perror(conn, "Failed to create socket for migrated flow");
return rc; return rc;
} }
s = conn->sock; s = conn->sock;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int))) if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int)))
debug_perror("Setting SO_REUSEADDR on socket %i", s); flow_dbg_perror(conn, "Failed to set SO_REUSEADDR on socket %i",
s);
tcp_sock_set_nodelay(s); tcp_sock_set_nodelay(s);
if (bind(s, &a.sa, sizeof(a))) { if (bind(s, &a.sa, sizeof(a))) {
rc = -errno; rc = -errno;
err_perror("Failed to bind socket for migrated flow"); flow_perror(conn, "Failed to bind socket for migrated flow");
goto err; goto err;
} }
@ -3390,7 +3408,7 @@ static int tcp_flow_repair_connect(const struct ctx *c,
rc = flowside_connect(c, conn->sock, PIF_HOST, tgt); rc = flowside_connect(c, conn->sock, PIF_HOST, tgt);
if (rc) { if (rc) {
rc = -errno; rc = -errno;
err_perror("Failed to connect migrated socket %i", conn->sock); flow_perror(conn, "Failed to connect migrated socket");
return rc; return rc;
} }
@ -3421,8 +3439,8 @@ int tcp_flow_migrate_target(struct ctx *c, int fd)
} }
if (read_all_buf(fd, &t, sizeof(t))) { if (read_all_buf(fd, &t, sizeof(t))) {
flow_perror(flow, "Failed to receive migration data");
flow_alloc_cancel(flow); flow_alloc_cancel(flow);
err_perror("Failed to receive migration data");
return -errno; return -errno;
} }
@ -3481,7 +3499,7 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
if (read_all_buf(fd, &t, sizeof(t))) { if (read_all_buf(fd, &t, sizeof(t))) {
rc = -errno; rc = -errno;
err_perror("Failed to read extended data for socket %i", s); flow_perror(conn, "Failed to read extended data");
return rc; return rc;
} }
@ -3503,31 +3521,34 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
t.rcv_wnd = ntohl(t.rcv_wnd); t.rcv_wnd = ntohl(t.rcv_wnd);
t.rcv_wup = ntohl(t.rcv_wup); t.rcv_wup = ntohl(t.rcv_wup);
debug("Extended migration data, socket %i sequences send %u receive %u", flow_dbg(conn,
s, t.seq_snd, t.seq_rcv); "Extended migration data, socket %i sequences send %u receive %u",
debug(" pending queues: send %u not sent %u receive %u", s, t.seq_snd, t.seq_rcv);
t.sndq, t.notsent, t.rcvq); flow_dbg(conn, " pending queues: send %u not sent %u receive %u",
debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u", t.sndq, t.notsent, t.rcvq);
t.snd_wl1, t.snd_wnd, t.max_window, t.rcv_wnd, t.rcv_wup); flow_dbg(conn,
debug(" SO_PEEK_OFF %s offset=%"PRIu32, " window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u",
peek_offset_cap ? "enabled" : "disabled", peek_offset); t.snd_wl1, t.snd_wnd, t.max_window, t.rcv_wnd, t.rcv_wup);
flow_dbg(conn, " SO_PEEK_OFF %s offset=%"PRIu32,
peek_offset_cap ? "enabled" : "disabled", peek_offset);
if (t.sndq > TCP_MIGRATE_SND_QUEUE_MAX || t.notsent > t.sndq || if (t.sndq > TCP_MIGRATE_SND_QUEUE_MAX || t.notsent > t.sndq ||
t.rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) { t.rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) {
err("Bad queues socket %i, send: %u, not sent: %u, receive: %u", flow_err(conn,
s, t.sndq, t.notsent, t.rcvq); "Bad queues socket %i, send: %u, not sent: %u, receive: %u",
s, t.sndq, t.notsent, t.rcvq);
return -EINVAL; return -EINVAL;
} }
if (read_all_buf(fd, tcp_migrate_snd_queue, t.sndq)) { if (read_all_buf(fd, tcp_migrate_snd_queue, t.sndq)) {
rc = -errno; rc = -errno;
err_perror("Failed to read send queue data, socket %i", s); flow_perror(conn, "Failed to read send queue data");
return rc; return rc;
} }
if (read_all_buf(fd, tcp_migrate_rcv_queue, t.rcvq)) { if (read_all_buf(fd, tcp_migrate_rcv_queue, t.rcvq)) {
rc = -errno; rc = -errno;
err_perror("Failed to read receive queue data, socket %i", s); flow_perror(conn, "Failed to read receive queue data");
return rc; return rc;
} }
@ -3535,32 +3556,32 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
/* We weren't able to create the socket, discard flow */ /* We weren't able to create the socket, discard flow */
goto fail; goto fail;
if (tcp_flow_select_queue(s, TCP_SEND_QUEUE)) if (tcp_flow_select_queue(conn, TCP_SEND_QUEUE))
goto fail; goto fail;
if (tcp_flow_repair_seq(s, &t.seq_snd)) if (tcp_flow_repair_seq(conn, &t.seq_snd))
goto fail; goto fail;
if (tcp_flow_select_queue(s, TCP_RECV_QUEUE)) if (tcp_flow_select_queue(conn, TCP_RECV_QUEUE))
goto fail; goto fail;
if (tcp_flow_repair_seq(s, &t.seq_rcv)) if (tcp_flow_repair_seq(conn, &t.seq_rcv))
goto fail; goto fail;
if (tcp_flow_repair_connect(c, conn)) if (tcp_flow_repair_connect(c, conn))
goto fail; goto fail;
if (tcp_flow_repair_queue(s, t.rcvq, tcp_migrate_rcv_queue)) if (tcp_flow_repair_queue(conn, t.rcvq, tcp_migrate_rcv_queue))
goto fail; goto fail;
if (tcp_flow_select_queue(s, TCP_SEND_QUEUE)) if (tcp_flow_select_queue(conn, TCP_SEND_QUEUE))
goto fail; goto fail;
if (tcp_flow_repair_queue(s, t.sndq - t.notsent, if (tcp_flow_repair_queue(conn, t.sndq - t.notsent,
tcp_migrate_snd_queue)) tcp_migrate_snd_queue))
goto fail; goto fail;
if (tcp_flow_repair_opt(s, &t)) if (tcp_flow_repair_opt(conn, &t))
goto fail; goto fail;
/* If we sent a FIN sent and it was acknowledged (TCP_FIN_WAIT2), don't /* If we sent a FIN sent and it was acknowledged (TCP_FIN_WAIT2), don't
@ -3575,19 +3596,19 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
v = TCP_SEND_QUEUE; v = TCP_SEND_QUEUE;
if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, sizeof(v))) if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, sizeof(v)))
debug_perror("Selecting repair queue, socket %i", s); flow_perror(conn, "Selecting repair queue");
else else
shutdown(s, SHUT_WR); shutdown(s, SHUT_WR);
} }
if (tcp_flow_repair_wnd(s, &t)) if (tcp_flow_repair_wnd(conn, &t))
goto fail; goto fail;
tcp_flow_repair_off(c, conn); tcp_flow_repair_off(c, conn);
repair_flush(c); repair_flush(c);
if (t.notsent) { if (t.notsent) {
if (tcp_flow_repair_queue(s, t.notsent, if (tcp_flow_repair_queue(conn, t.notsent,
tcp_migrate_snd_queue + tcp_migrate_snd_queue +
(t.sndq - t.notsent))) { (t.sndq - t.notsent))) {
/* This sometimes seems to fail for unclear reasons. /* This sometimes seems to fail for unclear reasons.
@ -3607,15 +3628,16 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
if (t.tcpi_state == TCP_FIN_WAIT1) if (t.tcpi_state == TCP_FIN_WAIT1)
shutdown(s, SHUT_WR); shutdown(s, SHUT_WR);
if (tcp_set_peek_offset(conn->sock, peek_offset)) if (tcp_set_peek_offset(conn, peek_offset))
goto fail; goto fail;
tcp_send_flag(c, conn, ACK); tcp_send_flag(c, conn, ACK);
tcp_data_from_sock(c, conn); tcp_data_from_sock(c, conn);
if ((rc = tcp_epoll_ctl(c, conn))) { if ((rc = tcp_epoll_ctl(c, conn))) {
debug("Failed to subscribe to epoll for migrated socket %i: %s", flow_dbg(conn,
conn->sock, strerror_(-rc)); "Failed to subscribe to epoll for migrated socket: %s",
strerror_(-rc));
goto fail; goto fail;
} }

1
tcp.h
View file

@ -25,7 +25,6 @@ void tcp_timer(struct ctx *c, const struct timespec *now);
void tcp_defer_handler(struct ctx *c); void tcp_defer_handler(struct ctx *c);
void tcp_update_l2_buf(const unsigned char *eth_d, const unsigned char *eth_s); void tcp_update_l2_buf(const unsigned char *eth_d, const unsigned char *eth_s);
int tcp_set_peek_offset(int s, int offset);
extern bool peek_offset_cap; extern bool peek_offset_cap;

View file

@ -125,7 +125,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns,
conn->seq_to_tap = seq; conn->seq_to_tap = seq;
peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
if (tcp_set_peek_offset(conn->sock, peek_offset)) if (tcp_set_peek_offset(conn, peek_offset))
tcp_rst(c, conn); tcp_rst(c, conn);
} }
} }
@ -304,7 +304,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
conn->seq_ack_from_tap, conn->seq_to_tap); conn->seq_ack_from_tap, conn->seq_to_tap);
conn->seq_to_tap = conn->seq_ack_from_tap; conn->seq_to_tap = conn->seq_ack_from_tap;
already_sent = 0; already_sent = 0;
if (tcp_set_peek_offset(s, 0)) { if (tcp_set_peek_offset(conn, 0)) {
tcp_rst(c, conn); tcp_rst(c, conn);
return -1; return -1;
} }

View file

@ -177,5 +177,6 @@ int tcp_update_seqack_wnd(const struct ctx *c, struct tcp_tap_conn *conn,
int tcp_prepare_flags(const struct ctx *c, struct tcp_tap_conn *conn, int tcp_prepare_flags(const struct ctx *c, struct tcp_tap_conn *conn,
int flags, struct tcphdr *th, struct tcp_syn_opts *opts, int flags, struct tcphdr *th, struct tcp_syn_opts *opts,
size_t *optlen); size_t *optlen);
int tcp_set_peek_offset(const struct tcp_tap_conn *conn, int offset);
#endif /* TCP_INTERNAL_H */ #endif /* TCP_INTERNAL_H */

View file

@ -376,7 +376,7 @@ int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
conn->seq_ack_from_tap, conn->seq_to_tap); conn->seq_ack_from_tap, conn->seq_to_tap);
conn->seq_to_tap = conn->seq_ack_from_tap; conn->seq_to_tap = conn->seq_ack_from_tap;
already_sent = 0; already_sent = 0;
if (tcp_set_peek_offset(conn->sock, 0)) { if (tcp_set_peek_offset(conn, 0)) {
tcp_rst(c, conn); tcp_rst(c, conn);
return -1; return -1;
} }