From c3b3d5971588babeef282d2b3f75b082baecf4c3 Mon Sep 17 00:00:00 2001 From: Jakub Jedlicka Date: Tue, 26 Nov 2024 21:02:45 +0100 Subject: [PATCH] Add coverage for ArC fix when a framework bean uses the decorator This adding the coverage for https://issues.redhat.com/browse/QUARKUS-5178 It's tested with Quarkiverse extension as the reproducer attached in issue and it was only scenario which I was able to reproduce. To test this you need external lib which contains class (lib_class) implementing the interface. This interface is also aplied on class anotated as decorator in Quarkus app when the bean of lib_class is injected at the same time. This error happen randomly in dev/test mode. The Quarkus PR is /~https://github.com/quarkusio/quarkus/pull/43245 (cherry picked from commit 0d5dbd9cc21ce16466238190e9877e875864b8c8) --- README.md | 5 +++ messaging/kafka-processor/pom.xml | 33 +++++++++++++++++ .../processor/decorator/HeaderDecorator.java | 37 +++++++++++++++++++ .../processor/processor/PingProcessor.java | 15 ++++++++ .../src/main/resources/application.properties | 3 ++ .../processor/DevModeKafkaProcessorIT.java | 34 +++++++++++++++++ pom.xml | 1 + 7 files changed, 128 insertions(+) create mode 100644 messaging/kafka-processor/pom.xml create mode 100644 messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/decorator/HeaderDecorator.java create mode 100644 messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/processor/PingProcessor.java create mode 100644 messaging/kafka-processor/src/main/resources/application.properties create mode 100644 messaging/kafka-processor/src/test/java/io/quarkus/ts/messaging/kafka/processor/DevModeKafkaProcessorIT.java diff --git a/README.md b/README.md index dc1411ac2..7bc42aa40 100644 --- a/README.md +++ b/README.md @@ -974,6 +974,11 @@ There is an EventsProducer that generate stock prices events every 1s. The event A Kafka consumer will read these events serialized by AVRO and change an `status` property to `COMPLETED`. The streams of completed events will be exposed through an SSE endpoint. +### `messaging/kafka-processor` + +This module verify the [QUARKUS-5178](https://issues.redhat.com/browse/QUARKUS-5178), which using Quarkiverse extension. +This is tested only in dev mode only as it's not happening in prod mode. + ### `messaging/kafka-strimzi-avro-reactive-messaging` - Verifies that `Quarkus Kafka` + `Apicurio Kakfa Registry`(AVRO) and `Quarkus SmallRye Reactive Messaging` extensions work as expected. diff --git a/messaging/kafka-processor/pom.xml b/messaging/kafka-processor/pom.xml new file mode 100644 index 000000000..89829e954 --- /dev/null +++ b/messaging/kafka-processor/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + io.quarkus.ts.qe + parent + 1.0.0-SNAPSHOT + ../.. + + + + 2.0.1 + + kafka-processor + Quarkus QE TS: Messaging: Reactive Processor Quarkiverse + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-api + ${quarkus-kafka-streams-processor.version} + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + ${quarkus-kafka-streams-processor.version} + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor + ${quarkus-kafka-streams-processor.version} + + + diff --git a/messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/decorator/HeaderDecorator.java b/messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/decorator/HeaderDecorator.java new file mode 100644 index 000000000..e2e21cbf9 --- /dev/null +++ b/messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/decorator/HeaderDecorator.java @@ -0,0 +1,37 @@ +package io.quarkus.ts.messaging.kafka.processor.decorator; + +import java.nio.charset.StandardCharsets; + +import jakarta.annotation.Priority; +import jakarta.decorator.Decorator; +import jakarta.decorator.Delegate; +import jakarta.inject.Inject; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; + +@Decorator +@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION + 2) +public class HeaderDecorator implements Processor { + private final Processor delegate; + + @Inject + public HeaderDecorator(@Delegate Processor delegate) { + this.delegate = delegate; + } + + @Override + public void process(Record record) { + Header header = record.headers().lastHeader("custom-header"); + if (header != null) { + String value = new String(header.value(), StandardCharsets.UTF_8); + if (value.contains("error")) { + throw new IllegalStateException("Error in header"); + } + } + delegate.process(record); + } +} diff --git a/messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/processor/PingProcessor.java b/messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/processor/PingProcessor.java new file mode 100644 index 000000000..9bc6cdf3f --- /dev/null +++ b/messaging/kafka-processor/src/main/java/io/quarkus/ts/messaging/kafka/processor/processor/PingProcessor.java @@ -0,0 +1,15 @@ +package io.quarkus.ts.messaging.kafka.processor.processor; + +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.Processor; + +@Processor +public class PingProcessor extends ContextualProcessor { + + @Override + public void process(Record ping) { + context().forward(ping); + } +} diff --git a/messaging/kafka-processor/src/main/resources/application.properties b/messaging/kafka-processor/src/main/resources/application.properties new file mode 100644 index 000000000..c39a6231f --- /dev/null +++ b/messaging/kafka-processor/src/main/resources/application.properties @@ -0,0 +1,3 @@ +kafkastreamsprocessor.input.topic=ping-events +kafkastreamsprocessor.output.topic=pong-events +quarkus.kafka-streams.topics=ping-events,pong-events diff --git a/messaging/kafka-processor/src/test/java/io/quarkus/ts/messaging/kafka/processor/DevModeKafkaProcessorIT.java b/messaging/kafka-processor/src/test/java/io/quarkus/ts/messaging/kafka/processor/DevModeKafkaProcessorIT.java new file mode 100644 index 000000000..b7ef557cb --- /dev/null +++ b/messaging/kafka-processor/src/test/java/io/quarkus/ts/messaging/kafka/processor/DevModeKafkaProcessorIT.java @@ -0,0 +1,34 @@ +package io.quarkus.ts.messaging.kafka.processor; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.bootstrap.RestService; +import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.services.DevModeQuarkusApplication; +import io.quarkus.ts.messaging.kafka.processor.decorator.HeaderDecorator; + +@Tag("QUARKUS-5178") +@QuarkusScenario +public class DevModeKafkaProcessorIT { + + @DevModeQuarkusApplication + static RestService app = new RestService().setAutoStart(false); + + /** + * The QUARKUS-5178 was caused only in dev mode and not all the time. + * The selected 5 runs start stops should be enough to detect the original issue + */ + @Test + public void quarkusShouldStartWithoutFailTest() { + // As QUARKUS-5178 not occurring all the time so occasionally we need to check the dev mode multiple time + for (int i = 0; i < 5; i++) { + assertDoesNotThrow(() -> app.start(), + "The QUARKUS-5178 is probably not fixed"); + app.logs().assertDoesNotContain("java.lang.ClassNotFoundException: " + HeaderDecorator.class.getName()); + app.stop(); + } + } +} diff --git a/pom.xml b/pom.xml index 575a19630..d788a5692 100644 --- a/pom.xml +++ b/pom.xml @@ -531,6 +531,7 @@ env-info messaging/amqp-reactive messaging/qpid + messaging/kafka-processor messaging/kafka-streams-reactive-messaging messaging/kafka-confluent-avro-reactive-messaging messaging/kafka-strimzi-avro-reactive-messaging