diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fcab528a3..e87ba93661 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,48 @@ librdkafka v2.3.1 is a feature release: +* Fix segfault when using long client id because of erased segment when using flexver. (#4689) +* Fix for an idempotent producer error, with a message batch not reconstructed + identically when retried (#4750) + + +## Enhancements + + * Update bundled lz4 (used when `./configure --disable-lz4-ext`) to + [v1.9.4](/~https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains + bugfixes and performance improvements (#4726). + + +## Fixes + +### General fixes + +* Issues: [confluentinc/confluent-kafka-dotnet#2084](/~https://github.com/confluentinc/confluent-kafka-dotnet/issues/2084) + Fix segfault when a segment is erased and more data is written to the buffer. + Happens since 1.x when a portion of the buffer (segment) is erased for flexver or compression. + More likely to happen since 2.1.0, because of the upgrades to flexver, with certain string sizes like a long client id (#4689). + +### Idempotent producer fixes + + * Issues: #4736 + Fix for an idempotent producer error, with a message batch not reconstructed + identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet". + Happening on large batches. Solved by using the same backoff baseline for all messages + in the batch. + Happens since 2.2.0 (#4750). + + + +# librdkafka v2.4.0 + +librdkafka v2.4.0 is a feature release: + + * [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol. + **Early Access**: This should be used only for evaluation and must not be used in production. Features and contract of this KIP might change in future (#4610). + * [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records): Augment ProduceResponse error messaging for specific culprit records (#4583). + * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + Continue partial implementation by adding a metadata cache by topic id + and updating the topic id corresponding to the partition name (#4676) * Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes, check the [release notes](https://www.openssl.org/news/cl30.txt). diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 357c137db8..9ea776abe9 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -896,6 +896,7 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, int retry_max_ms) { rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable); rd_kafka_msg_t *rkm, *tmp; + rd_ts_t now; int64_t jitter = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, 100 + RD_KAFKA_RETRY_JITTER_PERCENT); /* Scan through messages to see which ones are eligible for retry, @@ -903,7 +904,14 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, * set backoff time for first message and optionally * increase retry count for each message. * Sorted insert is not necessary since the original order - * srcq order is maintained. */ + * srcq order is maintained. + * + * Start timestamp for calculating backoff is common, + * to avoid that messages from the same batch + * have different backoff, as they need to be retried + * by reconstructing the same batch, when idempotency is + * enabled. */ + now = rd_clock(); TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) { if (rkm->rkm_u.producer.retries + incr_retry > max_retries) continue; @@ -927,7 +935,7 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, backoff = jitter * backoff * 10; if (backoff > retry_max_ms * 1000) backoff = retry_max_ms * 1000; - backoff = rd_clock() + backoff; + backoff = now + backoff; } rkm->rkm_u.producer.ts_backoff = backoff;