Skip to content

Commit

Permalink
Exposing MessageState in ServiceBusReceivedMessage (#26897)
Browse files Browse the repository at this point in the history
* Adding ServiceBusMessageState.

* Exposing ServiceBusMessageState and returning it.

* Add CHANGELOG entry

* Adding tests.
  • Loading branch information
conniey authored Feb 7, 2022
1 parent 7ee1c99 commit 071d6b3
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 1 deletion.
3 changes: 3 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.6.0-beta.1 (Unreleased)

### Features Added
- Add `ServiceBusMessageState` property to received messages which indicates whether the message is active, scheduled or deferred. It is exposed it in `ServiceBusReceivedMessage.getMessageState()`. ([#25217](/~https://github.com/Azure/azure-sdk-for-java/issues/25217))

### Bugs Fixed

- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](/~https://github.com/Azure/azure-sdk-for-java/issues/25182))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusMessageState;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

import java.time.Duration;
Expand Down Expand Up @@ -46,12 +47,14 @@
*/
public final class ServiceBusReceivedMessage {
private final ClientLogger logger = new ClientLogger(ServiceBusReceivedMessage.class);

private final AmqpAnnotatedMessage amqpAnnotatedMessage;

private UUID lockToken;
private boolean isSettled = false;
private Context context;

static final String SERVICE_BUS_MESSAGE_STATE_KEY = "x-opt-message-state";

ServiceBusReceivedMessage(BinaryData body) {
Objects.requireNonNull(body, "'body' cannot be null.");
amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes()));
Expand Down Expand Up @@ -320,6 +323,25 @@ public String getMessageId() {
return messageId;
}

/**
* Gets the state of the message.
*
* The state of the message can be Active, Deferred, or Scheduled. Deferred messages have Deferred state, scheduled
* messages have Scheduled state, all other messages have Active state.
*
* @return The state of the message.
* @throws UnsupportedOperationException if the message state is an unknown value.
*/
public ServiceBusMessageState getMessageState() {
final Object value = amqpAnnotatedMessage.getMessageAnnotations().get(SERVICE_BUS_MESSAGE_STATE_KEY);

if (value instanceof Integer) {
return ServiceBusMessageState.fromValue((Integer) value);
} else {
return ServiceBusMessageState.ACTIVE;
}
}

/**
* Gets the partition key for sending a message to a partitioned entity.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus.models;

import com.azure.messaging.servicebus.ServiceBusReceivedMessage;

/**
* Represents the message state of the {@link ServiceBusReceivedMessage}.
*/
public enum ServiceBusMessageState {
/**
* Specifies an active message state.
*/
ACTIVE(0),
/**
* Specifies a deferred message state.
*/
DEFERRED(1),
/**
* Specifies a scheduled message state.
*/
SCHEDULED(2);

private final int value;

ServiceBusMessageState(int value) {
this.value = value;
}

/**
* Gets the value of the message state.
*
* @return The value of the message state.
*/
public int getValue() {
return value;
}

/**
* Gets the message state from {@code value}.
*
* @param value Integer value of the message state.
*
* @return The corresponding message state.
*
* @throws UnsupportedOperationException if {@code value} is not a known message state.
*/
public static ServiceBusMessageState fromValue(int value) {
switch (value) {
case 0:
return ServiceBusMessageState.ACTIVE;
case 1:
return ServiceBusMessageState.DEFERRED;
case 2:
return ServiceBusMessageState.SCHEDULED;
default:
throw new UnsupportedOperationException(
"Value is not supported. Should be 0(ACTIVE), 1(DEFERRED), or 2(SCHEDULED). Actual: " + value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@

import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.models.ServiceBusMessageState;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
import java.util.stream.Stream;

import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME;
Expand All @@ -22,11 +27,13 @@
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
import static com.azure.messaging.servicebus.ServiceBusReceivedMessage.SERVICE_BUS_MESSAGE_STATE_KEY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -147,6 +154,45 @@ public void toServiceBusMessageTest() {
assertNull(actual.getRawAmqpMessage().getHeader().getDeliveryCount());
}

public static Stream<Arguments> canGetMessageState() {
return Stream.of(
Arguments.of(0, ServiceBusMessageState.ACTIVE),
Arguments.of(1, ServiceBusMessageState.DEFERRED),
Arguments.of(2, ServiceBusMessageState.SCHEDULED)
);
}

@MethodSource
@ParameterizedTest
public void canGetMessageState(Integer value, ServiceBusMessageState expected) {
// Arrange
final ServiceBusReceivedMessage message = new ServiceBusReceivedMessage(PAYLOAD_BINARY);
message.getRawAmqpMessage().getMessageAnnotations().put(SERVICE_BUS_MESSAGE_STATE_KEY, value);

// Act
final ServiceBusMessageState actual = message.getMessageState();

// Assert
assertEquals(expected, actual);
}

@Test
public void defaultMessageState() {
final ServiceBusReceivedMessage message = new ServiceBusReceivedMessage(PAYLOAD_BINARY);

assertEquals(ServiceBusMessageState.ACTIVE, message.getMessageState());
}

@Test
public void throwsOnInvalidMessageState() {
// Arrange
final ServiceBusReceivedMessage message = new ServiceBusReceivedMessage(PAYLOAD_BINARY);
message.getRawAmqpMessage().getMessageAnnotations().put(SERVICE_BUS_MESSAGE_STATE_KEY, 10);

// Act & Assert
assertThrows(UnsupportedOperationException.class, () -> message.getMessageState());
}

public void assertNullValues(Map<String, Object> dataMap, AmqpMessageConstant... keys) {
for (AmqpMessageConstant key : keys) {
assertNull(dataMap.get(key.getValue()));
Expand Down

0 comments on commit 071d6b3

Please sign in to comment.