Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket options are now all set before connection #4893

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# librdkafka v2.6.2

librdkafka v2.6.2 is a maintenance release:

* Socket options are now all set before connection (#4893).


## Fixes

### General fixes

* Socket options are now all set before connection, as [documentation](https://man7.org/linux/man-pages/man7/tcp.7.html)
says it's needed for socket buffers to take effect, even if in some
cases they could have effect even after connection.
Happening since v0.9.0 (#4893).



# librdkafka v2.6.1

librdkafka v2.6.1 is a maintenance release:
Expand Down
79 changes: 39 additions & 40 deletions src/rdkafka_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,33 +543,6 @@ void rd_kafka_transport_post_connect_setup(rd_kafka_transport_t *rktrans) {
rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
unsigned int slen;

/* Set socket send & receive buffer sizes if configuerd */
if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
if (setsockopt(
rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_sndbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
"Failed to set socket send "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_sndbuf_size,
rd_socket_strerror(rd_socket_errno));
}

if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
if (setsockopt(
rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_rcvbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
"Failed to set socket receive "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
rd_socket_strerror(rd_socket_errno));
}

/* Get send and receive buffer sizes to allow limiting
* the total number of bytes passed with iovecs to sendmsg()
* and recvmsg(). */
Expand Down Expand Up @@ -598,19 +571,6 @@ void rd_kafka_transport_post_connect_setup(rd_kafka_transport_t *rktrans) {
} else if (rktrans->rktrans_sndbuf_size < 1024 * 64)
rktrans->rktrans_sndbuf_size =
1024 * 64; /* Use at least 64KB */


#ifdef TCP_NODELAY
if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
int one = 1;
if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY,
(void *)&one, sizeof(one)) == RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
"Failed to disable Nagle (TCP_NODELAY) "
"on socket: %s",
rd_socket_strerror(rd_socket_errno));
}
#endif
}


Expand Down Expand Up @@ -1081,6 +1041,45 @@ rd_kafka_transport_t *rd_kafka_transport_new(rd_kafka_broker_t *rkb,
}
#endif

#ifdef TCP_NODELAY
if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
int one = 1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (void *)&one,
sizeof(one)) == RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
"Failed to disable Nagle (TCP_NODELAY) "
"on socket: %s",
rd_socket_strerror(rd_socket_errno));
}
#endif

/* Set socket send & receive buffer sizes if configuerd */
if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
if (setsockopt(
s, SOL_SOCKET, SO_SNDBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_sndbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
"Failed to set socket send "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_sndbuf_size,
rd_socket_strerror(rd_socket_errno));
}

if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
if (setsockopt(
s, SOL_SOCKET, SO_RCVBUF,
(void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
sizeof(rkb->rkb_rk->rk_conf.socket_rcvbuf_size)) ==
RD_SOCKET_ERROR)
rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
"Failed to set socket receive "
"buffer size to %i: %s",
rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
rd_socket_strerror(rd_socket_errno));
}

/* Set the socket to non-blocking */
if ((r = rd_fd_set_nonblocking(s))) {
rd_snprintf(errstr, errstr_size,
Expand Down