Skip to content

Commit

Permalink
ChangeFeed Spark Bug Processing All Partitions (#42553)
Browse files Browse the repository at this point in the history
* ChangeFeedEndLSN for spark the java implementation

* ChangeFeedEndLSN for spark the java implementation

* ChangeFeedEndLSN for spark the spark connector implementation

* Added tests for using endLSN

* Revert module-info.java

* Updated changelog

* Reacting to comments

* Reacting to comments

* Fix Tests

* Fix Tests

* Fix Tests

* Fix tests

* Fix tests

* Add delay for changes and more logs

* added more logs

* added more logs

* Changed test to not use shared database

* Fix tests

* Fix tests and remove logs

* Reacting to comments

* Reacting to comments

* Fix tests

* Reacting to comments

* management sdk fix

* check azure identity version

* revert azure-identity version

* Revert identity dep

* Merge main

* Move tests to live tests

* Move to live tests
  • Loading branch information
tvaron3 authored Nov 26, 2024
1 parent ceff155 commit 05ed0e4
Show file tree
Hide file tree
Showing 20 changed files with 583 additions and 57 deletions.
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -27,7 +28,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -27,7 +28,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -27,7 +28,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -27,7 +28,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](/~https://github.com/Azure/azure-sdk-for-java/pull/42553)

#### Other Changes

Expand All @@ -27,7 +28,7 @@
### 4.33.0 (2024-06-22)

#### Features Added
* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)

#### Bugs Fixed
* Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](/~https://github.com/Azure/azure-sdk-for-java/pull/40714)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ private case class ChangeFeedPartitionReader
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes =>
changeFeedItemDeserializerV1
}
if (this.partition.endLsn.isDefined) {
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor
.setEndLSN(options, this.partition.endLsn.get)
}

options.setCustomItemSerializer(itemDeserializer)
}
Expand Down Expand Up @@ -224,28 +228,12 @@ private case class ChangeFeedPartitionReader
},
readConfig.maxItemCount,
readConfig.prefetchBufferSize,
operationContextAndListenerTuple
operationContextAndListenerTuple,
this.partition.endLsn
)

override def next(): Boolean = {
this.iterator.hasNext && this.validateNextLsn
}

private[this] def validateNextLsn: Boolean = {
this.partition.endLsn match {
case None =>
// In batch mode endLsn is cleared - we will always continue reading until the change feed is
// completely drained so all partitions return 304
true
case Some(endLsn) =>
// In streaming mode we only continue until we hit the endOffset's continuation Lsn
val node = this.iterator.head()
assert(node.lsn != null, "Change feed responses must have _lsn property.")
assert(node.lsn != "", "Change feed responses must have non empty _lsn.")
val nextLsn = SparkBridgeImplementationInternal.toLsn(node.lsn)

nextLsn <= endLsn
}
this.iterator.hasNext
}

override def get(): InternalRow = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ private case class ItemsPartitionReader
},
readConfig.maxItemCount,
readConfig.prefetchBufferSize,
operationContextAndListenerTuple
operationContextAndListenerTuple,
None
)

private val rowSerializer: ExpressionEncoder.Serializer[Row] = RowSerializerPool.getOrCreateSerializer(readSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.{CosmosException, spark}
import com.azure.cosmos.CosmosException
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple
import com.azure.cosmos.models.FeedResponse
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
Expand All @@ -14,9 +14,8 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import scala.util.Random
import scala.util.control.Breaks
import scala.concurrent.{Await, ExecutionContext, Future}
import com.azure.cosmos.implementation.OperationCancelledException
import com.azure.cosmos.implementation.{ChangeFeedSparkRowItem, OperationCancelledException, SparkBridgeImplementationInternal}

import scala.concurrent.duration.FiniteDuration

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
Expand All @@ -42,7 +41,8 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
val cosmosPagedFluxFactory: String => CosmosPagedFlux[TSparkRow],
val pageSize: Int,
val pagePrefetchBufferSize: Int,
val operationContextAndListener: Option[OperationContextAndListenerTuple]
val operationContextAndListener: Option[OperationContextAndListenerTuple],
val endLsn: Option[Long]
) extends BufferedIterator[TSparkRow] with BasicLoggingTrait with AutoCloseable {

private[spark] var maxRetryIntervalInMs = CosmosConstants.maxRetryIntervalForTransientFailuresInMs
Expand Down Expand Up @@ -162,7 +162,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
val iteratorCandidate = feedResponse.getResults.iterator().asScala.buffered
lastContinuationToken.set(feedResponse.getContinuationToken)

if (iteratorCandidate.hasNext) {
if (iteratorCandidate.hasNext && validateNextLsn(iteratorCandidate)) {
currentItemIterator = Some(iteratorCandidate)
Some(true)
} else {
Expand All @@ -178,7 +178,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]

private def hasBufferedNext: Boolean = {
currentItemIterator match {
case Some(iterator) => if (iterator.hasNext) {
case Some(iterator) => if (iterator.hasNext && validateNextLsn(iterator)) {
true
} else {
currentItemIterator = None
Expand Down Expand Up @@ -239,6 +239,27 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
returnValue.get
}

private[this] def validateNextLsn(itemIterator: BufferedIterator[TSparkRow]): Boolean = {
this.endLsn match {
case None =>
// Only relevant in change feed
// In batch mode endLsn is cleared - we will always continue reading until the change feed is
// completely drained so all partitions return 304
true
case Some(endLsn) =>
// In streaming mode we only continue until we hit the endOffset's continuation Lsn
if (itemIterator.isEmpty) {
return false
}
val node = itemIterator.head.asInstanceOf[ChangeFeedSparkRowItem]
assert(node.lsn != null, "Change feed responses must have _lsn property.")
assert(node.lsn != "", "Change feed responses must have non empty _lsn.")
val nextLsn = SparkBridgeImplementationInternal.toLsn(node.lsn)

nextLsn <= endLsn
}
}

// Correct way to cancel a flux and dispose it
// /~https://github.com/reactor/reactor-core/blob/main/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxTests.java#L837
override def close(): Unit = {
Expand Down
Loading

0 comments on commit 05ed0e4

Please sign in to comment.