Skip to content

Commit

Permalink
Adding rntbd open connections information in client telemetry (#26393)
Browse files Browse the repository at this point in the history
* Adding rntbd open connections information in client telemetry

* Fixing spot bug

* Adding CT enable check before collecting data from rntbd

* Adding null check
  • Loading branch information
simplynaveen20 authored Feb 4, 2022
1 parent 5a0a5d7 commit ed04f03
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,13 @@
<Bug pattern="URF_UNREAD_FIELD"/>
</Match>

<!-- Bug: /~https://github.com/Azure/azure-sdk-for-java/issues/9077 -->
<Match>
<Class name="com.azure.cosmos.implementation.directconnectivity.StoreClientFactory"/>
<Field name="clientTelemetry"/>
<Bug pattern="URF_UNREAD_FIELD"/>
</Match>

<!-- Bug: /~https://github.com/Azure/azure-sdk-for-java/issues/9078 -->
<Match>
<Class name="com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestHeaders"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,16 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
collectionCache);

updateGatewayProxy();
if (this.connectionPolicy.getConnectionMode() == ConnectionMode.GATEWAY) {
this.storeModel = this.gatewayProxy;
} else {
this.initializeDirectConnectivity();
}
clientTelemetry = new ClientTelemetry(null, UUID.randomUUID().toString(),
ManagementFactory.getRuntimeMXBean().getName(), userAgentContainer.getUserAgent(),
connectionPolicy.getConnectionMode(), globalEndpointManager.getLatestDatabaseAccount().getId(),
null, null, this.reactorHttpClient, connectionPolicy.isClientTelemetryEnabled(), this, this.connectionPolicy.getPreferredRegions());
clientTelemetry.init();
if (this.connectionPolicy.getConnectionMode() == ConnectionMode.GATEWAY) {
this.storeModel = this.gatewayProxy;
} else {
this.initializeDirectConnectivity();
}
this.queryPlanCache = new ConcurrentHashMap<>();
this.retryPolicy.setRxCollectionCache(this.collectionCache);
} catch (Exception e) {
Expand Down Expand Up @@ -503,7 +503,8 @@ private void initializeDirectConnectivity() {
this.connectionPolicy,
// this.maxConcurrentConnectionOpenRequests,
this.userAgentContainer,
this.connectionSharingAcrossClientsEnabled
this.connectionSharingAcrossClientsEnabled,
this.clientTelemetry
);

this.createStoreModel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class ClientTelemetry {
public final static String REQUEST_CHARGE_NAME = "RequestCharge";
public final static String REQUEST_CHARGE_UNIT = "RU";

public final static String TCP_NEW_CHANNEL_LATENCY_NAME = "TcpNewChannelOpenLatency";
public final static String TCP_NEW_CHANNEL_LATENCY_UNIT = "MilliSecond";
public final static int TCP_NEW_CHANNEL_LATENCY_MAX_MILLI_SEC = 300000;
public final static int TCP_NEW_CHANNEL_LATENCY_PRECISION = 2;

public final static int CPU_MAX = 100;
public final static int CPU_PRECISION = 2;
private final static String CPU_NAME = "CPU";
Expand Down Expand Up @@ -136,6 +141,10 @@ public static void recordValue(ConcurrentDoubleHistogram doubleHistogram, double
}
}

public boolean isClientTelemetryEnabled() {
return isClientTelemetryEnabled;
}

public void init() {
loadAzureVmMetaData();
sendClientTelemetry().subscribe();
Expand Down Expand Up @@ -266,6 +275,7 @@ private static <T> T parse(String itemResponseBodyAsString, Class<T> itemClassTy
}

private void clearDataForNextRun() {
this.clientTelemetryInfo.getSystemInfoMap().clear();
this.clientTelemetryInfo.getOperationInfoMap().clear();
this.clientTelemetryInfo.getCacheRefreshInfoMap().clear();
for (ConcurrentDoubleHistogram histogram : this.clientTelemetryInfo.getSystemInfoMap().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class RntbdTransportClient extends TransportClient {
private final RntbdEndpoint.Provider endpointProvider;
private final long id;
private final Tag tag;
private boolean channelAcquisitionContextEnabled;

// endregion

Expand All @@ -106,12 +108,14 @@ public RntbdTransportClient(
final Configs configs,
final ConnectionPolicy connectionPolicy,
final UserAgentContainer userAgent,
final IAddressResolver addressResolver) {
final IAddressResolver addressResolver,
final ClientTelemetry clientTelemetry) {

this(
new Options.Builder(connectionPolicy).userAgent(userAgent).build(),
configs.getSslContext(),
addressResolver);
addressResolver,
clientTelemetry);
}

RntbdTransportClient(final RntbdEndpoint.Provider endpointProvider) {
Expand All @@ -123,16 +127,19 @@ public RntbdTransportClient(
RntbdTransportClient(
final Options options,
final SslContext sslContext,
final IAddressResolver addressResolver) {
final IAddressResolver addressResolver,
final ClientTelemetry clientTelemetry) {

this.endpointProvider = new RntbdServiceEndpoint.Provider(
this,
options,
checkNotNull(sslContext, "expected non-null sslContext"),
addressResolver);
addressResolver,
clientTelemetry);

this.id = instanceCount.incrementAndGet();
this.tag = RntbdTransportClient.tag(this.id);
this.channelAcquisitionContextEnabled = options.channelAcquisitionContextEnabled;
}

// endregion
Expand Down Expand Up @@ -229,7 +236,9 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
response.setRequestPayloadLength(request.getContentLength());
response.setRntbdChannelTaskQueueSize(record.channelTaskQueueLength());
response.setRntbdPendingRequestSize(record.pendingRequestQueueSize());
response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline());
if(this.channelAcquisitionContextEnabled) {
response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline());
}
}

})).onErrorMap(throwable -> {
Expand Down Expand Up @@ -263,7 +272,9 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
BridgeInternal.setRntbdPendingRequestQueueSize(cosmosException, record.pendingRequestQueueSize());
BridgeInternal.setChannelTaskQueueSize(cosmosException, record.channelTaskQueueLength());
BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted());
BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline());
if(this.channelAcquisitionContextEnabled) {
BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline());
}

return cosmosException;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.LifeCycleUtils;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -37,13 +38,14 @@ public static TransportClient getOrCreateInstance(
ConnectionPolicy connectionPolicy,
UserAgentContainer userAgent,
DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig,
IAddressResolver addressResolver) {
IAddressResolver addressResolver,
ClientTelemetry clientTelemetry) {

synchronized (SharedTransportClient.class) {
if (sharedTransportClient == null) {
assert counter.get() == 0;
logger.info("creating a new shared RntbdTransportClient");
sharedTransportClient = new SharedTransportClient(protocol, configs, connectionPolicy, userAgent, addressResolver);
sharedTransportClient = new SharedTransportClient(protocol, configs, connectionPolicy, userAgent, addressResolver, clientTelemetry);
} else {
logger.info("Reusing an instance of RntbdTransportClient");
}
Expand All @@ -62,11 +64,12 @@ private SharedTransportClient(
Configs configs,
ConnectionPolicy connectionPolicy,
UserAgentContainer userAgent,
IAddressResolver addressResolver) {
IAddressResolver addressResolver,
ClientTelemetry clientTelemetry) {
if (protocol == Protocol.TCP) {
this.rntbdOptions =
new RntbdTransportClient.Options.Builder(connectionPolicy).userAgent(userAgent).build();
this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver);
this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver, clientTelemetry);

} else if (protocol == Protocol.HTTPS){
this.rntbdOptions = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.SessionContainer;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;

// We suppress the "try" warning here because the close() method's signature
// allows it to throw InterruptedException which is strongly advised against
Expand All @@ -21,16 +22,19 @@ public class StoreClientFactory implements AutoCloseable {
private final Configs configs;
private final TransportClient transportClient;
private volatile boolean isClosed;
private final ClientTelemetry clientTelemetry;

public StoreClientFactory(
IAddressResolver addressResolver,
DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig,
Configs configs,
ConnectionPolicy connectionPolicy,
UserAgentContainer userAgent,
boolean enableTransportClientSharing) {
boolean enableTransportClientSharing,
ClientTelemetry clientTelemetry) {

this.configs = configs;
this.clientTelemetry = clientTelemetry;
Protocol protocol = configs.getProtocol();
if (enableTransportClientSharing) {
this.transportClient = SharedTransportClient.getOrCreateInstance(
Expand All @@ -39,15 +43,16 @@ public StoreClientFactory(
connectionPolicy,
userAgent,
diagnosticsClientConfig,
addressResolver);
addressResolver,
clientTelemetry);
} else {
if (protocol == Protocol.HTTPS) {
this.transportClient = new HttpTransportClient(configs, connectionPolicy, userAgent);
} else if (protocol == Protocol.TCP) {

RntbdTransportClient.Options rntbdOptions =
new RntbdTransportClient.Options.Builder(connectionPolicy).userAgent(userAgent).build();
this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver);
this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver, clientTelemetry);
diagnosticsClientConfig.withRntbdOptions(rntbdOptions);

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@

package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ReportPayload;
import org.HdrHistogram.ConcurrentDoubleHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -25,11 +31,12 @@ public List<RntbdChannelAcquisitionEvent> getEvents() {

public static RntbdChannelAcquisitionEvent startNewEvent(
RntbdChannelAcquisitionTimeline timeline,
RntbdChannelAcquisitionEventType eventType) {
RntbdChannelAcquisitionEventType eventType,
ClientTelemetry clientTelemetry) {

if (timeline != null) {
RntbdChannelAcquisitionEvent newEvent = new RntbdChannelAcquisitionEvent(eventType, Instant.now());
timeline.addNewEvent(newEvent);
timeline.addNewEvent(newEvent, clientTelemetry);

return newEvent;
}
Expand All @@ -39,20 +46,37 @@ public static RntbdChannelAcquisitionEvent startNewEvent(
public static RntbdPollChannelEvent startNewPollEvent(
RntbdChannelAcquisitionTimeline timeline,
int availableChannels,
int acquiredChannels) {
int acquiredChannels,
ClientTelemetry clientTelemetry) {

if (timeline != null) {
RntbdPollChannelEvent newEvent = new RntbdPollChannelEvent(availableChannels, acquiredChannels, Instant.now());
timeline.addNewEvent(newEvent);
timeline.addNewEvent(newEvent, clientTelemetry);
return newEvent;
}

return null;
}

private void addNewEvent(RntbdChannelAcquisitionEvent event) {
private void addNewEvent(RntbdChannelAcquisitionEvent event, ClientTelemetry clientTelemetry) {
if (this.currentEvent != null) {
this.currentEvent.complete(event.getCreatedTime());
if(clientTelemetry!= null && Configs.isClientTelemetryEnabled(clientTelemetry.isClientTelemetryEnabled())) {
if (event.getEventType().equals(RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE)) {
ReportPayload reportPayload = new ReportPayload(ClientTelemetry.TCP_NEW_CHANNEL_LATENCY_NAME,
ClientTelemetry.TCP_NEW_CHANNEL_LATENCY_UNIT);
ConcurrentDoubleHistogram newChannelLatencyHistogram =
clientTelemetry.getClientTelemetryInfo().getSystemInfoMap().get(reportPayload);
if (newChannelLatencyHistogram == null) {
newChannelLatencyHistogram =
new ConcurrentDoubleHistogram(ClientTelemetry.TCP_NEW_CHANNEL_LATENCY_MAX_MILLI_SEC,
ClientTelemetry.TCP_NEW_CHANNEL_LATENCY_PRECISION);
clientTelemetry.getClientTelemetryInfo().getSystemInfoMap().put(reportPayload, newChannelLatencyHistogram);
}
ClientTelemetry.recordValue(newChannelLatencyHistogram,
Duration.between(this.currentEvent.getCreatedTime(), this.currentEvent.getCompleteTime()).toMillis());
}
}
}
this.events.add(event);
this.currentEvent = event;
Expand Down
Loading

0 comments on commit ed04f03

Please sign in to comment.