Skip to content

Commit

Permalink
Amqp core metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Aug 29, 2022
1 parent d981605 commit ed45b5e
Show file tree
Hide file tree
Showing 41 changed files with 1,917 additions and 134 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ public final class ClientConstants {
public static final String CONNECTION_ID_KEY = "connectionId";
public static final String LINK_NAME_KEY = "linkName";
public static final String ENTITY_PATH_KEY = "entityPath";
public static final String ENTITY_NAME_KEY = "entityName";
public static final String SESSION_NAME_KEY = "sessionName";
public static final String FULLY_QUALIFIED_NAMESPACE_KEY = "namespace";
public static final String OPERATION_NAME_KEY = "amqpOperation";
public static final String DELIVERY_STATE_KEY = "deliveryState";
public static final String ERROR_CONDITION_KEY = "errorCondition";
public static final String ERROR_DESCRIPTION_KEY = "errorDescription";
public static final String EMIT_RESULT_KEY = "emitResult";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.SESSION_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.SIGNAL_TYPE_KEY;
import static com.azure.core.util.FluxUtil.monoError;

/**
* An AMQP connection backed by proton-j.
Expand Down Expand Up @@ -192,12 +193,8 @@ public Flux<AmqpShutdownSignal> getShutdownSignals() {
public Mono<AmqpManagementNode> getManagementNode(String entityPath) {
return Mono.defer(() -> {
if (isDisposed()) {
// TODO(limolkova) this can be simplified with FluxUtil.monoError(LoggingEventBuilder), not using it for now
// to allow using azure-core-amqp with stable azure-core 1.24.0 to simplify dependency management
// we should switch to it once monoError(LoggingEventBuilder) ships in stable azure-core
return Mono.error(logger.atError()
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.log(Exceptions.propagate(new IllegalStateException("Connection is disposed. Cannot get management instance."))));
return monoError(logger.atError().addKeyValue(ENTITY_PATH_KEY, entityPath),
Exceptions.propagate(new IllegalStateException("Connection is disposed. Cannot get management instance.")));
}

final AmqpManagementNode existing = managementNodes.get(entityPath);
Expand Down Expand Up @@ -404,7 +401,7 @@ protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChan
.cast(ReactorSession.class)
.map(reactorSession -> new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), linkName,
entityPath, reactorSession.session(), connectionOptions.getRetry(), handlerProvider, reactorProvider,
messageSerializer, senderSettleMode, receiverSettleMode))
messageSerializer, senderSettleMode, receiverSettleMode, handlerProvider.getMetricProvider(getFullyQualifiedNamespace(), entityPath)))
.doOnNext(e -> {
logger.atInfo()
.addKeyValue(ENTITY_PATH_KEY, entityPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@
import com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler;
import com.azure.core.amqp.implementation.handler.WebSocketsProxyConnectionHandler;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.reactor.Reactor;

import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
* Provides handlers for the various types of links.
*/
public class ReactorHandlerProvider {
private static final ClientLogger LOGGER = new ClientLogger(ReactorHandlerProvider.class);
private final ConcurrentHashMap<String, AmqpMetricsProvider> metricsCache = new ConcurrentHashMap<>();
private final Meter meter;

private final ReactorProvider provider;

/**
Expand All @@ -34,9 +39,24 @@ public class ReactorHandlerProvider {
* @param provider The provider that creates and manages {@link Reactor} instances.
*
* @throws NullPointerException If {@code provider} is {@code null}.
* @deprecated use {@link ReactorHandlerProvider#ReactorHandlerProvider(ReactorProvider, Meter)} instead.
*/
@Deprecated
public ReactorHandlerProvider(ReactorProvider provider) {
this(provider, null);
}

/**
* Creates a new instance with the reactor provider to handle {@link ReactorDispatcher ReactorDispatchers} to its
* generated handlers.
*
* @param provider The provider that creates and manages {@link Reactor} instances.
*
* @throws NullPointerException If {@code provider} is {@code null}.
*/
public ReactorHandlerProvider(ReactorProvider provider, Meter meter) {
this.provider = Objects.requireNonNull(provider, "'provider' cannot be null.");
this.meter = meter;
}

/**
Expand All @@ -53,10 +73,11 @@ public ConnectionHandler createConnectionHandler(String connectionId, Connection
Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
Objects.requireNonNull(options, "'options' cannot be null.");

AmqpMetricsProvider metricsProvider = getMetricProvider(options.getFullyQualifiedNamespace(), null);
if (options.getTransportType() == AmqpTransportType.AMQP) {
final SslPeerDetails peerDetails = Proton.sslPeerDetails(options.getHostname(), options.getPort());

return new ConnectionHandler(connectionId, options, peerDetails);
return new ConnectionHandler(connectionId, options, peerDetails, metricsProvider);
}

if (options.getTransportType() != AmqpTransportType.AMQP_WEB_SOCKETS) {
Expand Down Expand Up @@ -84,22 +105,22 @@ public ConnectionHandler createConnectionHandler(String connectionId, Connection

final SslPeerDetails peerDetails = Proton.sslPeerDetails(options.getHostname(), options.getPort());

return new WebSocketsProxyConnectionHandler(connectionId, options, options.getProxyOptions(), peerDetails);
return new WebSocketsProxyConnectionHandler(connectionId, options, options.getProxyOptions(), peerDetails, metricsProvider);
} else if (isSystemProxyConfigured) {
LOGGER.info("System default proxy configured for hostname:port '{}:{}'. Using proxy.",
options.getFullyQualifiedNamespace(), options.getPort());

final SslPeerDetails peerDetails = Proton.sslPeerDetails(options.getHostname(), options.getPort());

return new WebSocketsProxyConnectionHandler(connectionId, options, ProxyOptions.SYSTEM_DEFAULTS,
peerDetails);
peerDetails, metricsProvider);
}

final SslPeerDetails peerDetails = isCustomEndpointConfigured
? Proton.sslPeerDetails(options.getHostname(), options.getPort())
: Proton.sslPeerDetails(options.getFullyQualifiedNamespace(), options.getPort());

return new WebSocketsConnectionHandler(connectionId, options, peerDetails);
return new WebSocketsConnectionHandler(connectionId, options, peerDetails, metricsProvider);
}

/**
Expand All @@ -115,7 +136,7 @@ public SessionHandler createSessionHandler(String connectionId, String hostname,
Duration openTimeout) {

return new SessionHandler(connectionId, hostname, sessionName, provider.getReactorDispatcher(),
openTimeout);
openTimeout, getMetricProvider(hostname, sessionName));
}

/**
Expand All @@ -129,7 +150,7 @@ public SessionHandler createSessionHandler(String connectionId, String hostname,
*/
public SendLinkHandler createSendLinkHandler(String connectionId, String hostname,
String senderName, String entityPath) {
return new SendLinkHandler(connectionId, hostname, senderName, entityPath);
return new SendLinkHandler(connectionId, hostname, senderName, entityPath, getMetricProvider(hostname, entityPath));
}

/**
Expand All @@ -143,6 +164,21 @@ public SendLinkHandler createSendLinkHandler(String connectionId, String hostnam
public ReceiveLinkHandler createReceiveLinkHandler(String connectionId, String hostname,
String receiverName, String entityPath) {

return new ReceiveLinkHandler(connectionId, hostname, receiverName, entityPath);
return new ReceiveLinkHandler(connectionId, hostname, receiverName, entityPath, getMetricProvider(hostname, entityPath));
}

/**
* Returns cached {@link AmqpMetricsProvider} (or creates one) for given meter and entity.
* It's recommended to keep returned value in instance variable and to avoid calling
* this method extensively.
*/
AmqpMetricsProvider getMetricProvider(String namespace, String entityPath) {
if (meter != null && !meter.isEnabled()) {
return AmqpMetricsProvider.noop();
}

return metricsCache.computeIfAbsent(
namespace + (entityPath == null ? "" : "/" + entityPath),
ignored -> new AmqpMetricsProvider(meter, namespace, entityPath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,25 @@ public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoClo
private final Sinks.Empty<AmqpEndpointState> terminateEndpointStates = Sinks.empty();

private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference<>();
private final AmqpMetricsProvider metricsProvider;

@Deprecated
protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Receiver receiver,
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher,
AmqpRetryOptions retryOptions) {
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher,
AmqpRetryOptions retryOptions) {
this(amqpConnection, entityPath, receiver, handler, tokenManager, dispatcher, retryOptions,
new AmqpMetricsProvider(null, amqpConnection.getFullyQualifiedNamespace(), entityPath));
}

protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Receiver receiver,
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher,
AmqpRetryOptions retryOptions, AmqpMetricsProvider metricsProvider) {
this.entityPath = entityPath;
this.receiver = receiver;
this.handler = handler;
this.tokenManager = tokenManager;
this.dispatcher = dispatcher;
this.metricsProvider = metricsProvider;

Map<String, Object> loggingContext = createContextWithConnectionId(handler.getConnectionId());
loggingContext.put(LINK_NAME_KEY, this.handler.getLinkName());
Expand All @@ -95,6 +105,8 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
return;
}
final Message message = decodeDelivery(delivery);
metricsProvider.recordReceivedMessage(message);

final int creditsLeft = receiver.getRemoteCredit();

if (creditsLeft > 0) {
Expand All @@ -110,6 +122,7 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
.addKeyValue("credits", credits)
.log("Adding credits.");
receiver.flow(credits);
metricsProvider.recordAddCredits(credits);
} else {
logger.atVerbose()
.addKeyValue("credits", credits)
Expand Down Expand Up @@ -206,6 +219,7 @@ public Mono<Void> addCredits(int credits) {
try {
dispatcher.invoke(() -> {
receiver.flow(credits);
metricsProvider.recordAddCredits(credits);
sink.success();
});
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
private final String activeTimeoutMessage;
private final Scheduler scheduler;

private final AmqpMetricsProvider metricsProvider;

private final Object errorConditionLock = new Object();

private volatile Exception lastKnownLinkError;
Expand All @@ -120,7 +122,7 @@ class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
*/
ReactorSender(AmqpConnection amqpConnection, String entityPath, Sender sender, SendLinkHandler handler,
ReactorProvider reactorProvider, TokenManager tokenManager, MessageSerializer messageSerializer,
AmqpRetryOptions retryOptions, Scheduler scheduler) {
AmqpRetryOptions retryOptions, Scheduler scheduler, AmqpMetricsProvider metricsProvider) {
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
this.sender = Objects.requireNonNull(sender, "'sender' cannot be null.");
this.handler = Objects.requireNonNull(handler, "'handler' cannot be null.");
Expand All @@ -131,6 +133,8 @@ class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
this.retry = RetryUtil.getRetryPolicy(retryOptions);
this.tokenManager = tokenManager;

this.metricsProvider = metricsProvider;

String connectionId = handler.getConnectionId() == null ? NOT_APPLICABLE : handler.getConnectionId();
String linkName = getLinkName() == null ? NOT_APPLICABLE : getLinkName();

Expand Down Expand Up @@ -445,7 +449,7 @@ public Mono<DeliveryState> send(byte[] bytes, int arrayOffset, int messageFormat

return activeEndpointFlux.then(Mono.create(sink -> {
sendWork(new RetriableWorkItem(bytes, arrayOffset, messageFormat, sink, retryOptions.getTryTimeout(),
deliveryState));
deliveryState, metricsProvider));
}));
}

Expand Down Expand Up @@ -511,6 +515,7 @@ private void processSendWork() {
Exception sendException = null;

try {
workItem.beforeTry();
delivery = sender.delivery(deliveryTag.getBytes(UTF_8));
delivery.setMessageFormat(workItem.getMessageFormat());

Expand Down Expand Up @@ -541,7 +546,9 @@ private void processSendWork() {
.addKeyValue("payloadActualSize", workItem.getEncodedMessageSize())
.log("Sendlink advance failed.");

DeliveryState outcome = null;
if (delivery != null) {
outcome = delivery.getRemoteState();
delivery.free();
}

Expand All @@ -554,21 +561,19 @@ private void processSendWork() {
"Entity(%s): send operation failed while advancing delivery(tag: %s).",
entityPath, deliveryTag), context);

workItem.error(exception);
workItem.error(exception, outcome);
}
}
}

private void processDeliveredMessage(Delivery delivery) {
final DeliveryState outcome = delivery.getRemoteState();
final String deliveryTag = new String(delivery.getTag(), UTF_8);

logger.atVerbose()
.addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
.log("Process delivered message.");

final RetriableWorkItem workItem = pendingSendsMap.remove(deliveryTag);

if (workItem == null) {
logger.atVerbose()
.addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
Expand Down Expand Up @@ -613,7 +618,7 @@ private void processDeliveredMessage(Delivery delivery) {
final Duration retryInterval = retry.calculateRetryDelay(exception, retryAttempt);

if (retryInterval == null || retryInterval.compareTo(workItem.getTimeoutTracker().remaining()) > 0) {
cleanupFailedSend(workItem, exception);
cleanupFailedSend(workItem, exception, outcome);
} else {
workItem.setLastKnownException(exception);
try {
Expand All @@ -625,18 +630,19 @@ private void processDeliveredMessage(Delivery delivery) {
new AmqpException(false,
String.format(Locale.US, "Entity(%s): send operation failed while scheduling a"
+ " retry on Reactor, see cause for more details.", entityPath),
schedulerException, handler.getErrorContext(sender)));
schedulerException, handler.getErrorContext(sender)),
outcome);
}
}
} else if (outcome instanceof Released) {
cleanupFailedSend(workItem, new OperationCancelledException(outcome.toString(),
handler.getErrorContext(sender)));
handler.getErrorContext(sender)), outcome);
} else if (outcome instanceof Declared) {
final Declared declared = (Declared) outcome;
workItem.success(declared);
} else {
cleanupFailedSend(workItem, new AmqpException(false, outcome.toString(),
handler.getErrorContext(sender)));
handler.getErrorContext(sender)), outcome);
}
}

Expand All @@ -651,9 +657,9 @@ private void scheduleWorkOnDispatcher() {
}
}

private void cleanupFailedSend(final RetriableWorkItem workItem, final Exception exception) {
private void cleanupFailedSend(final RetriableWorkItem workItem, final Exception exception, final DeliveryState deliveryState) {
//TODO (conniey): is there some timeout task I should handle?
workItem.error(exception);
workItem.error(exception, deliveryState);
}

private void completeClose() {
Expand Down Expand Up @@ -684,8 +690,9 @@ private void handleError(Throwable error) {
.log("Disposing pending sends with error.");
}

pendingSendsMap.forEach((key, value) -> value.error(error));
pendingSendsMap.forEach((key, value) -> value.error(error, null));
pendingSendsMap.clear();

pendingSendsQueue.clear();
}

Expand All @@ -706,7 +713,7 @@ private void handleClose() {
.log("Disposing pending sends.");
}

pendingSendsMap.forEach((key, value) -> value.error(new AmqpException(true, message, context)));
pendingSendsMap.forEach((key, value) -> value.error(new AmqpException(true, message, context), null));
pendingSendsMap.clear();
pendingSendsQueue.clear();
}
Expand Down Expand Up @@ -798,7 +805,7 @@ public void run() {
handler.getErrorContext(sender));
}

workItem.error(exception);
workItem.error(exception, null);
}
}
}
Loading

0 comments on commit ed45b5e

Please sign in to comment.