Skip to content

Commit

Permalink
Socket options are now all set before connection (#4893)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored Dec 11, 2024
1 parent 63e18cd commit 785688e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 40 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# librdkafka v2.6.2

librdkafka v2.6.2 is a maintenance release:

* Socket options are now all set before connection (#4893).


## Fixes

### General fixes

* Socket options are now all set before connection, as [documentation](https://man7.org/linux/man-pages/man7/tcp.7.html)
says it's needed for socket buffers to take effect, even if in some
cases they could have effect even after connection.
Happening since v0.9.0 (#4893).



# librdkafka v2.6.1

librdkafka v2.6.1 is a maintenance release:
Expand Down
79 changes: 39 additions & 40 deletions src/rdkafka_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,33 +543,6 @@ void rd_kafka_transport_post_connect_setup(rd_kafka_transport_t *rktrans) {
rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
unsigned int slen;

/* Set socket send & receive buffer sizes if configuerd */
if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
if (setsockopt(
rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_sndbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
"Failed to set socket send "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_sndbuf_size,
rd_socket_strerror(rd_socket_errno));
}

if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
if (setsockopt(
rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_rcvbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
"Failed to set socket receive "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
rd_socket_strerror(rd_socket_errno));
}

/* Get send and receive buffer sizes to allow limiting
* the total number of bytes passed with iovecs to sendmsg()
* and recvmsg(). */
Expand Down Expand Up @@ -598,19 +571,6 @@ void rd_kafka_transport_post_connect_setup(rd_kafka_transport_t *rktrans) {
} else if (rktrans->rktrans_sndbuf_size < 1024 * 64)
rktrans->rktrans_sndbuf_size =
1024 * 64; /* Use at least 64KB */


#ifdef TCP_NODELAY
if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
int one = 1;
if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY,
(void *)&one, sizeof(one)) == RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
"Failed to disable Nagle (TCP_NODELAY) "
"on socket: %s",
rd_socket_strerror(rd_socket_errno));
}
#endif
}


Expand Down Expand Up @@ -1081,6 +1041,45 @@ rd_kafka_transport_t *rd_kafka_transport_new(rd_kafka_broker_t *rkb,
}
#endif

#ifdef TCP_NODELAY
if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
int one = 1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (void *)&one,
sizeof(one)) == RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
"Failed to disable Nagle (TCP_NODELAY) "
"on socket: %s",
rd_socket_strerror(rd_socket_errno));
}
#endif

/* Set socket send & receive buffer sizes if configuerd */
if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
if (setsockopt(
s, SOL_SOCKET, SO_SNDBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_sndbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
"Failed to set socket send "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_sndbuf_size,
rd_socket_strerror(rd_socket_errno));
}

if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
if (setsockopt(
s, SOL_SOCKET, SO_RCVBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_rcvbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
"Failed to set socket receive "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
rd_socket_strerror(rd_socket_errno));
}

/* Set the socket to non-blocking */
if ((r = rd_fd_set_nonblocking(s))) {
rd_snprintf(errstr, errstr_size,
Expand Down

0 comments on commit 785688e

Please sign in to comment.