Skip to content

Commit

Permalink
Client throughput control: Deferring store invocation (#22144)
Browse files Browse the repository at this point in the history
* Dummy

* Deferring teh request when ClinetThrouhgputControl is enabled

* Client throughput control: defer store invocation

* Adding additional test-coverage in throughput tests

* Reacting to code review feedback.

* Adding back createItem_withBulk test
  • Loading branch information
FabianMeiswinkel authored Jun 8, 2021
1 parent c44cbf4 commit cd2363d
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOper
return ((ItemBulkOperation<?>) itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap(
result -> {
if (result.shouldRetry) {
groupSink.next(itemOperation);
return Mono.empty();
return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation);
} else {
return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(
itemOperation, cosmosBulkItemResponse, this.batchContext));
Expand Down Expand Up @@ -281,24 +280,38 @@ private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExec
return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
}

private Mono<CosmosBulkOperationResponse<TContext>> enqueueForRetry(
Duration backOffTime,
FluxSink<CosmosItemOperation> groupSink,
CosmosItemOperation itemOperation) {

if (backOffTime == null || backOffTime.isZero()) {
groupSink.next(itemOperation);
return Mono.empty();
} else {
return Mono
.delay(backOffTime)
.flatMap((dumm) -> {
groupSink.next(itemOperation);
return Mono.empty();
});
}
}

private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(
CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink,
CosmosException cosmosException, ItemBulkOperation<?> itemBulkOperation) {
return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> {
if (result.shouldRetry) {

groupSink.next(itemOperation);
return Mono.empty();
return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation);
} else {

return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(
itemOperation, exception, this.batchContext));
}
});
}

private Mono<TransactionalBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {

return this.docClientWrapper.executeBatchRequest(
BridgeInternal.getLink(this.container), serverRequest, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public Mono<StoreResponse> invokeResourceOperationAsync(Uri physicalAddress, RxD
request.requestContext.resourcePhysicalAddress = physicalAddress.toString();
}
if (this.throughputControlStore != null) {
return this.throughputControlStore.processRequest(request, this.invokeStoreAsync(physicalAddress, request));
return this.throughputControlStore.processRequest(
request,
Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request)));
}

return this.invokeStoreAsync(physicalAddress, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package com.azure.cosmos;

import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,14 +16,12 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class CosmosBulkAsyncTest extends BatchTestBase {

Expand All @@ -38,7 +38,10 @@ public CosmosBulkAsyncTest(CosmosClientBuilder clientBuilder) {
@BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT)
public void before_CosmosBulkAsyncTest() {
assertThat(this.bulkClient).isNull();
this.bulkClient = getClientBuilder().buildAsyncClient();
ThrottlingRetryOptions throttlingOptions = new ThrottlingRetryOptions()
.setMaxRetryAttemptsOnThrottledRequests(1000000)
.setMaxRetryWaitTime(Duration.ofDays(1));
this.bulkClient = getClientBuilder().throttlingRetryOptions(throttlingOptions).buildAsyncClient();
bulkAsyncContainer = getSharedMultiPartitionCosmosContainer(this.bulkClient);
}

Expand All @@ -47,6 +50,75 @@ public void afterClass() {
safeCloseAsync(this.bulkClient);
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void createItem_withBulkAndThroughputControl() throws InterruptedException {
int totalRequest = getTotalRequest(180, 200);

PartitionKeyDefinition pkDefinition = new PartitionKeyDefinition();
pkDefinition.setPaths(Collections.singletonList("/mypk"));
CosmosAsyncContainer bulkAsyncContainerWithThroughputControl = createCollection(
this.bulkClient,
bulkAsyncContainer.getDatabase().getId(),
new CosmosContainerProperties(UUID.randomUUID().toString(), pkDefinition));

ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder()
.setGroupName("test-group")
.setTargetThroughputThreshold(0.2)
.setDefault(true)
.build();
bulkAsyncContainerWithThroughputControl.enableLocalThroughputControlGroup(groupConfig);

Flux<CosmosItemOperation> cosmosItemOperationFlux = Flux.merge(
Flux.range(0, totalRequest).map(i -> {
String partitionKey = UUID.randomUUID().toString();
TestDoc testDoc = this.populateTestDoc(partitionKey);

return BulkOperations.getUpsertItemOperation(testDoc, new PartitionKey(partitionKey));
}),
Flux.range(0, totalRequest).map(i -> {
String partitionKey = UUID.randomUUID().toString();
EventDoc eventDoc = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", partitionKey);

return BulkOperations.getUpsertItemOperation(eventDoc, new PartitionKey(partitionKey));
}));

BulkProcessingOptions<CosmosBulkAsyncTest> bulkProcessingOptions = new BulkProcessingOptions<>();
bulkProcessingOptions.setMaxMicroBatchSize(100);
bulkProcessingOptions.setMaxMicroBatchConcurrency(5);

try {
Flux<CosmosBulkOperationResponse<CosmosBulkAsyncTest>> responseFlux = bulkAsyncContainerWithThroughputControl
.processBulkOperations(cosmosItemOperationFlux, bulkProcessingOptions);

Thread.sleep(1000);

AtomicInteger processedDoc = new AtomicInteger(0);
responseFlux
.flatMap((CosmosBulkOperationResponse<CosmosBulkAsyncTest> cosmosBulkOperationResponse) -> {

processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull();
assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull();
assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull();

return Mono.just(cosmosBulkItemResponse);
}).blockLast();

assertThat(processedDoc.get()).isEqualTo(totalRequest * 2);
} finally {
bulkAsyncContainerWithThroughputControl.delete().block();
}
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void createItem_withBulk() {
int totalRequest = getTotalRequest();
Expand Down Expand Up @@ -79,6 +151,11 @@ public void createItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}

assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -127,6 +204,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -150,6 +231,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -199,6 +284,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -224,6 +313,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CONFLICT.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -323,6 +416,10 @@ public void upsertItem_withbulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -344,7 +441,7 @@ public void upsertItem_withbulk() {

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void deleteItem_withBulk() {
int totalRequest = getTotalRequest();
int totalRequest = Math.min(getTotalRequest(), 20);

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();

Expand Down Expand Up @@ -376,6 +473,10 @@ public void deleteItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -424,6 +525,10 @@ public void readItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -477,6 +582,10 @@ public void readItemMultipleTimes_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -539,6 +648,10 @@ public void replaceItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -572,6 +685,10 @@ private void createItemsAndVerify(List<CosmosItemOperation> cosmosItemOperations

processedDoc.incrementAndGet();
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -595,10 +712,14 @@ private void createItemsAndVerify(List<CosmosItemOperation> cosmosItemOperations
assertThat(distinctIndex.size()).isEqualTo(cosmosItemOperations.size());
}

private int getTotalRequest() {
int countRequest = new Random().nextInt(100) + 200;
private int getTotalRequest(int min, int max) {
int countRequest = new Random().nextInt(max - min) + min;
logger.info("Total count of request for this test case: " + countRequest);

return countRequest;
}

private int getTotalRequest() {
return getTotalRequest(200, 300);
}
}
Loading

0 comments on commit cd2363d

Please sign in to comment.