Skip to content

Commit

Permalink
Merge pull request #2676 from ozangunalp/flaky_kafka_request_reply_test
Browse files Browse the repository at this point in the history
Attempt to fix flaky kafka request reply test
  • Loading branch information
cescoffier authored Jul 3, 2024
2 parents 198a76d + 228349a commit a46e9ec
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void testLazyInitializedProducer() {
Map<String, Object> props = new HashMap<>();
props.put("tracing-enabled", false);
props.put("lazy-client", true);
props.put("retries", 0L);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "invalid-bootstrap-servers");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -468,13 +469,14 @@ void testReplyOffsetResetEarliest() {
.with("reply.auto.offset.reset", "earliest"), RequestReplyProducer.class);

for (int i = 0; i < 10; i++) {
app.requestReply().request(i).subscribe().with(replies::add);
app.requestReply().request(Message.of(i, Metadata.of(OutgoingKafkaRecordMetadata.builder()
.withKey("" + i).build()))).subscribe().with(r -> replies.add(r.getPayload()));
}
await().untilAsserted(() -> assertThat(replies).hasSize(10));
assertThat(replies).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");

assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion())
.extracting(ConsumerRecord::value).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
.extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
}

@Test
Expand Down

0 comments on commit a46e9ec

Please sign in to comment.