Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeed Spark Bug Processing All Partitions #42553

Merged
merged 36 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4763ac9
ChangeFeedEndLSN for spark the java implementation
tvaron3 Oct 23, 2024
4b571d2
ChangeFeedEndLSN for spark the java implementation
tvaron3 Oct 23, 2024
36e91f9
ChangeFeedEndLSN for spark the spark connector implementation
tvaron3 Oct 23, 2024
99ad812
merge with main
tvaron3 Oct 23, 2024
c7d5a54
Added tests for using endLSN
tvaron3 Oct 24, 2024
1ffa067
Revert module-info.java
tvaron3 Oct 24, 2024
8abeb4e
Updated changelog
tvaron3 Oct 24, 2024
946b4d9
Reacting to comments
tvaron3 Oct 30, 2024
1bb6286
Reacting to comments
tvaron3 Oct 30, 2024
0c5a0e9
Fix Tests
tvaron3 Oct 30, 2024
7e34b71
Fix Tests
tvaron3 Oct 30, 2024
6968402
Fix Tests
tvaron3 Oct 30, 2024
1b1a136
Fix tests
tvaron3 Oct 31, 2024
3aa58c0
Fix tests
tvaron3 Nov 5, 2024
e4c7bc0
Add delay for changes and more logs
tvaron3 Nov 5, 2024
f0a35a1
added more logs
tvaron3 Nov 6, 2024
490bf49
added more logs
tvaron3 Nov 6, 2024
6a27567
Changed test to not use shared database
tvaron3 Nov 6, 2024
8a8eaf5
Fix tests
tvaron3 Nov 6, 2024
4b81481
Fix tests and remove logs
tvaron3 Nov 6, 2024
f41139f
Reacting to comments
tvaron3 Nov 7, 2024
bb02c42
Reacting to comments
tvaron3 Nov 7, 2024
80ca809
Fix tests
tvaron3 Nov 7, 2024
b62f71c
Reacting to comments
tvaron3 Nov 7, 2024
c647d8a
Merge main
tvaron3 Nov 7, 2024
6510f68
management sdk fix
tvaron3 Nov 7, 2024
db1fb2a
check azure identity version
tvaron3 Nov 8, 2024
1ce3f99
revert azure-identity version
tvaron3 Nov 13, 2024
54f0ce9
Merged main
tvaron3 Nov 15, 2024
4a5e41f
Revert identity dep
tvaron3 Nov 15, 2024
90f1b2e
Merge branch 'main' of /~https://github.com/Azure/azure-sdk-for-java in…
tvaron3 Nov 18, 2024
7bac1a3
Merge main
tvaron3 Nov 18, 2024
4a4d348
Merge branch 'main' of /~https://github.com/Azure/azure-sdk-for-java in…
tvaron3 Nov 19, 2024
372889c
Move tests to live tests
tvaron3 Nov 19, 2024
2851805
Move to live tests
tvaron3 Nov 19, 2024
2da246f
Merge branch 'main' of /~https://github.com/Azure/azure-sdk-for-java in…
tvaron3 Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` with `spark.cosmos.changeFeed.itemCountPerTriggerHint` causing all cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved

#### Other Changes

Expand All @@ -26,7 +27,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` with `spark.cosmos.changeFeed.itemCountPerTriggerHint` causing all cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -26,7 +27,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` with `spark.cosmos.changeFeed.itemCountPerTriggerHint` causing all cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -26,7 +27,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` with `spark.cosmos.changeFeed.itemCountPerTriggerHint` causing all cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -26,7 +27,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` with `spark.cosmos.changeFeed.itemCountPerTriggerHint` causing all cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -26,7 +27,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ private case class ChangeFeedPartitionReader
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes =>
changeFeedItemDeserializerV1
}
this.partition.endLsn.foreach { endLsn =>
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor
.setEndLSN(options, endLsn)
}

options.setCustomItemSerializer(itemDeserializer)
}
Expand Down Expand Up @@ -244,7 +248,7 @@ private case class ChangeFeedPartitionReader
assert(node.lsn != "", "Change feed responses must have non empty _lsn.")
val nextLsn = SparkBridgeImplementationInternal.toLsn(node.lsn)

nextLsn <= endLsn
nextLsn <= endLsn || this.iterator.hasMoreChangesToProcess.get()
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.{CosmosException, spark}
import com.azure.cosmos.CosmosException
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple
import com.azure.cosmos.models.FeedResponse
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
Expand All @@ -14,9 +14,8 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import scala.util.Random
import scala.util.control.Breaks
import scala.concurrent.{Await, ExecutionContext, Future}
import com.azure.cosmos.implementation.OperationCancelledException
import com.azure.cosmos.implementation.{ImplementationBridgeHelpers, OperationCancelledException}

import scala.concurrent.duration.FiniteDuration

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -56,6 +55,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
private val lastContinuationToken = new AtomicReference[String](null)
// scalastyle:on null
private val retryCount = new AtomicLong(0)
private[spark] val hasMoreChangesToProcess = new AtomicReference[Boolean](true)
private lazy val operationContextString = operationContextAndListener match {
case Some(o) => if (o.getOperationContext != null) {
o.getOperationContext.toString
Expand Down Expand Up @@ -154,6 +154,10 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]

if (hasNext) {
val feedResponse = feedResponseIterator.next()
if (!ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor
.getHasMoreChangesToProcess(feedResponse)) {
hasMoreChangesToProcess.set(false)
}
if (operationContextAndListener.isDefined) {
operationContextAndListener.get.getOperationListener.feedResponseProcessedListener(
operationContextAndListener.get.getOperationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -843,6 +845,57 @@ public void split_only_notModified() throws Exception {
assertThat(stateAfterLastDrainAttempt.getContinuation().getCompositeContinuationTokens()).hasSize(3);
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryCompleteAfterAvailableNowDataProvider", timeOut = 100 * TIMEOUT)
public void changeFeedQueryCompleteAfterEndLSN(
int throughput,
boolean shouldContinuouslyIngestItems) {
String testContainerId = UUID.randomUUID().toString();

try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
throughput);

List<FeedRange> feedRanges = testContainer.getFeedRanges().block();
AtomicInteger currentPageCount = new AtomicInteger(0);

insertDocuments(1, 6, testContainer);
CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor()
.setEndLSN(cosmosChangeFeedRequestOptions, 4L);

AtomicInteger totalQueryCount = new AtomicInteger(0);
AtomicBoolean hasMoreChanges = new AtomicBoolean(false);
testContainer.queryChangeFeed(cosmosChangeFeedRequestOptions, JsonNode.class)
.byPage(1)
.flatMap(response -> {
int currentPage = currentPageCount.incrementAndGet();
totalQueryCount.set(totalQueryCount.get() + response.getResults().size());
hasMoreChanges.set(ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor()
.getHasMoreChangesToProcess(response));


// Only start creating new items once we have looped through all feedRanges once to make the test behavior more deterministic
if (shouldContinuouslyIngestItems && currentPage >= feedRanges.size()) {
return testContainer
.createItem(getDocumentDefinition(UUID.randomUUID().toString())).then();
} else {
return Mono.empty();
}
})
.blockLast();
assertThat(hasMoreChanges.get()).isFalse();
assertThat(totalQueryCount.get()).isEqualTo(3);
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
}
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryCompleteAfterAvailableNowDataProvider", timeOut = 100 * TIMEOUT)
public void changeFeedQueryCompleteAfterAvailableNow(
int throughput,
Expand Down Expand Up @@ -912,7 +965,7 @@ void insertDocuments(

ArrayList<Mono<CosmosItemResponse<ObjectNode>>> result = new ArrayList<>();
for (int i = 0; i < docs.size(); i++) {
result.add(createdAsyncContainer
result.add(container
.createItem(docs.get(i)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,16 @@ public static <T> FeedResponse<T> createFeedResponseWithQueryMetrics(
QueryInfo.QueryPlanDiagnosticsContext diagnosticsContext,
boolean useEtagAsContinuation,
boolean isNoChangesResponse,
CosmosDiagnostics cosmosDiagnostics) {
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
CosmosDiagnostics cosmosDiagnostics,
Boolean hasMoreChangesToProcess) {
FeedResponse<T> feedResponseWithQueryMetrics = ModelBridgeInternal.createFeedResponseWithQueryMetrics(
results,
headers,
queryMetricsMap,
diagnosticsContext,
useEtagAsContinuation,
isNoChangesResponse);
isNoChangesResponse,
hasMoreChangesToProcess);

ClientSideRequestStatistics requestStatistics;
if (cosmosDiagnostics != null) {
Expand Down Expand Up @@ -178,6 +180,25 @@ public static <T> FeedResponse<T> createFeedResponseWithQueryMetrics(
return feedResponseWithQueryMetrics;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <T> FeedResponse<T> createFeedResponseWithQueryMetrics(
List<T> results,
Map<String, String> headers,
ConcurrentMap<String, QueryMetrics> queryMetricsMap,
QueryInfo.QueryPlanDiagnosticsContext diagnosticsContext,
boolean useEtagAsContinuation,
boolean isNoChangesResponse,
CosmosDiagnostics cosmosDiagnostics) {
return createFeedResponseWithQueryMetrics(results,
headers,
queryMetricsMap,
diagnosticsContext,
useEtagAsContinuation,
isNoChangesResponse,
cosmosDiagnostics,
null);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static CosmosDiagnostics createCosmosDiagnostics(Map<String, QueryMetrics> queryMetricsMap) {
return new CosmosDiagnostics(new FeedResponseDiagnostics(queryMetricsMap, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ private <T> FeedResponse<T> prepareFeedResponse(
boolean isNoChangesResponse = isChangeFeed ?
ModelBridgeInternal.getNoChangesFromFeedResponse(response)
: false;
Boolean hasMoreChangesToProcess = ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor()
.getHasMoreChangesToProcess(response);

return BridgeInternal.createFeedResponseWithQueryMetrics(
response.getResults(),
Expand All @@ -1195,7 +1197,8 @@ private <T> FeedResponse<T> prepareFeedResponse(
ModelBridgeInternal.getQueryPlanDiagnosticsContext(response),
useEtagAsContinuation,
isNoChangesResponse,
response.getCosmosDiagnostics());
response.getCosmosDiagnostics(),
hasMoreChangesToProcess);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public Flux<FeedResponse<T>> executeAsync() {
this.options.getMaxPrefetchPageCount(),
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options),
this.options.isCompleteAfterAllCurrentChangesRetrieved(),
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getEndLSN(this.options),
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
private String collectionRid;
private Set<String> keywordIdentifiers;
private boolean completeAfterAllCurrentChangesRetrieved;
private Long endLSN;

public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
this.continuationState = toBeCloned.continuationState;
Expand All @@ -70,6 +71,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB
this.partitionKeyDefinition = toBeCloned.partitionKeyDefinition;
this.keywordIdentifiers = toBeCloned.keywordIdentifiers;
this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved;
this.endLSN = toBeCloned.endLSN;
}

public CosmosChangeFeedRequestOptionsImpl(
Expand Down Expand Up @@ -363,6 +365,14 @@ public Set<String> getKeywordIdentifiers() {
return this.keywordIdentifiers;
}

public void setEndLSN(Long endLSN) {
this.endLSN = endLSN;
}

public Long getEndLSN() {
return endLSN;
}

public boolean isCompleteAfterAllCurrentChangesRetrieved() {
return this.completeAfterAllCurrentChangesRetrieved;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.azure.cosmos.SessionRetryOptions;
import com.azure.cosmos.ThroughputControlGroupConfig;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.batch.BulkExecutorDiagnosticsTracker;
import com.azure.cosmos.implementation.batch.ItemBatchOperation;
import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
Expand Down Expand Up @@ -82,7 +81,6 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.net.URI;
import java.time.Duration;
Expand All @@ -91,7 +89,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -374,6 +371,8 @@ public interface CosmosChangeFeedRequestOptionsAccessor {
CosmosChangeFeedRequestOptions setHeader(CosmosChangeFeedRequestOptions changeFeedRequestOptions, String name, String value);
Map<String, String> getHeader(CosmosChangeFeedRequestOptions changeFeedRequestOptions);
CosmosChangeFeedRequestOptionsImpl getImpl(CosmosChangeFeedRequestOptions changeFeedRequestOptions);
CosmosChangeFeedRequestOptions setEndLSN(CosmosChangeFeedRequestOptions changeFeedRequestOptions, Long endLsn);
Long getEndLSN(CosmosChangeFeedRequestOptions changeFeedRequestOptions);
void setOperationContext(CosmosChangeFeedRequestOptions changeFeedRequestOptions, OperationContextAndListenerTuple operationContext);
OperationContextAndListenerTuple getOperationContext(CosmosChangeFeedRequestOptions changeFeedRequestOptions);
CosmosDiagnosticsThresholds getDiagnosticsThresholds(CosmosChangeFeedRequestOptions options);
Expand Down Expand Up @@ -1086,6 +1085,8 @@ <T> FeedResponse<T> createChangeFeedResponse(RxDocumentServiceResponse response,
<TNew, T> FeedResponse<TNew> convertGenericType(FeedResponse<T> feedResponse, Function<T, TNew> conversion);
<T> FeedResponse<T> createFeedResponse(
List<T> results, Map<String, String> headers, CosmosDiagnostics diagnostics);
<T> FeedResponse<T> setHasMoreChangesToProcess(FeedResponse<T> feedResponse, boolean hasMoreChangesToProcess);
<T> Boolean getHasMoreChangesToProcess(FeedResponse<T> feedResponse);
}
}

Expand Down
Loading
Loading