-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: skip large rows #2482
feat: skip large rows #2482
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at RowSetUtil or any of the tests but left some comments you can work on first.
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java
Show resolved
Hide resolved
...-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...d-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java
Show resolved
Hide resolved
...le/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
Outdated
Show resolved
Hide resolved
/** | ||
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}. | ||
*/ | ||
public final class LargeRowReadCallable<RequestT, ResponseT, RowT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're modifying the ServerStreamingAttemptCallable directly we can remove this class.
@Override | ||
protected void onErrorImpl(Throwable t) { | ||
// this has no impact | ||
// if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has no effect because we create a new strategy here: /~https://github.com/googleapis/sdk-platform-java/blob/main/gax-java/gax/src/main/java/com/google/api/gax/rpc/RetryingServerStreamingCallable.java#L80 (which I didn't notice previously).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct!! I believe - we create new resumption strategy, so that parallel requests have their own state tracking via new resumption strategy objects per request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have resolved your comments - which are implemented in code.
unresolved comments - have not done - require a discussion - can discuss offline?
} | ||
} | ||
|
||
public void dumpLargeRowKeys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method can be implemented later. have added javadoc for the same
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
@Override | ||
protected void onErrorImpl(Throwable t) { | ||
// this has no impact | ||
// if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct!! I believe - we create new resumption strategy, so that parallel requests have their own state tracking via new resumption strategy objects per request
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
new RowMergingCallable<>(convertException, rowAdapter); | ||
|
||
LargeReadRowsResumptionStrategy<RowT> largeRowResumptionStrategy; | ||
largeRowResumptionStrategy = new LargeReadRowsResumptionStrategy<RowT>(rowAdapter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testing this
@@ -0,0 +1,125 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[duplicate comment]
[can discuss offline]
it is required.
it goes to the convertableExceptionCallable layer, which converts the FailedPreCondition Exception to ApiException & set the exception as ```retryable:true``
which then gets thrown to the ResumptionStrategy layer. Hence, this is required.
I have confirmed this in testing/debugging as well.
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
@@ -377,6 +382,28 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable( | |||
.withRetrySettings(settings.readRowsSettings().getRetrySettings())); | |||
} | |||
|
|||
public <RowT> ServerStreamingCallable<Query, RowT> createSkipLargeRowsBaseCallable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to create a base callable for this, so you can probably just extract everything from the other base callable and put it in this method. Otherwise they're all called base callables and it gets a bit confusing.
createSkipLargeRowsBaseCallable( | ||
settings.readRowsSettings(), | ||
rowAdapter, | ||
new LargeReadRowsResumptionStrategy<RowT>(rowAdapter)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to pass in the resumption strategy here because it's already overriden in the settings in the other callable (line 580). So this variable is not doing anything.
...d-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java
Outdated
Show resolved
Hide resolved
@@ -221,12 +222,17 @@ public void onResponseImpl(ResponseT response) { | |||
|
|||
@Override | |||
public void onErrorImpl(Throwable t) { | |||
if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) { | |
if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be updated.
...le/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have made the changes
remaining = | ||
RowSetUtil.erase(originalRequest.getRows(), lastSuccessKey, !originalRequest.getReversed()); | ||
if (!largeRowKeys.isEmpty()) { | ||
for (ByteString largeRowKey : largeRowKeys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have made the changes as you suggested.
Open question - is there a reason - why we didn't do this earlier. thinking of any edge cases that may fail.
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
81dc408
to
18d453c
Compare
Warning: This pull request is touching the following templated files:
|
19293fa
to
3397e51
Compare
… tests & wip integration tests
…haviour which returns error details,metadata on encountering large rows error
3397e51
to
76e8775
Compare
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Show resolved
Hide resolved
@@ -221,12 +222,17 @@ public void onResponseImpl(ResponseT response) { | |||
|
|||
@Override | |||
public void onErrorImpl(Throwable t) { | |||
if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be updated.
...le/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java
Show resolved
Hide resolved
d80c953
to
70a9329
Compare
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
* @param fromStart | ||
* @return | ||
*/ | ||
public static List<RowRange> eraseKeyFromRange(RowRange range, ByteString split, boolean fromStart) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this logic:
private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString largeRowKey) {
List<RowRange> rowRanges = new ArrayList<>();
ByteString startKey = StartPoint.extract(range).value;
ByteString endKey = EndPoint.extract(range).value;
// if end key is on the left of large row key, don't split
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
rowRanges.add(range);
return rowRanges;
}
// if start key is on the right of the large row key, don't split
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) {
rowRanges.add(range);
return rowRanges;
}
// if start key is on the left of the large row key, set the end key to be large row key open
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) {
RowRange beforeSplit = range
.toBuilder()
.setEndKeyOpen(largeRowKey)
.build();
rowRanges.add(beforeSplit);
}
// if the end key is on the right of the large row key, set the start key to be large row key open
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
RowRange afterSplit = range
.toBuilder()
.setStartKeyOpen(largeRowKey)
.build();
rowRanges.add(afterSplit);
}
return rowRanges;
}
This should cover all edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have done. testing.
@@ -49,6 +52,44 @@ | |||
public final class RowSetUtil { | |||
private RowSetUtil() {} | |||
|
|||
public static RowSet createSplitRanges( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should also resume from the last seen row key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want this:
public static RowSet eraseLargeRow(RowSet rowSet, ByteString lastSeenRowKey, ByteString largeRowKey, boolean fromStart) {
// first, remove everything we've already read from the RowSet
RowSet remaining = erase(rowSet, lastSeenRowKey, fromStart);
// return null if we've read everything
if (remaining == null) {
return null;
}
// second, remove the large row key from the remaining RowSet
RowSet.Builder newRowSet = RowSet.newBuilder();
// remove large row key from point reads
remaining.getRowKeysList().stream().filter(k -> !k.equals(largeRowKey)).forEach(newRowSet::addRowKeys);
// remove large row key from row ranges
for (RowRange range : remaining.getRowRangesList()) {
List<RowRange> afterSplit = splitOnLargeRowKey(range, largeRowKey);
if (!afterSplit.isEmpty()) {
afterSplit.forEach(newRowSet::addRowRanges);
}
}
if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) {
return null;
}
return newRowSet.build();
}
splitOnLargeRowKey
is in my other comment.
And in your resumption strategy, you wouldn't need to keep the previous request anymore, because that'll be removed by the RowSet remaining = erase(rowSet, lastSeenRowKey, fromStart);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we needed to keep the prev request, because, for the use case of multiple large row keys together.
if we dont keep the prev request, if the prev request failed because of large-row and the next request also failed because of large-row, in this request, we will remove the 2nd large-row key & the prev one would be there, this would keep failing
ex -
request [r1,r4]
r1 - success key
r2 - large-row key
r3 - large-row key
r4 - large-row key
original request -> [r1,r4]
r1 read, r2 fails -> req becomes (r1,r2),(r2,r4]
r3 fails, req becomes -> (r1,r3), (r3,r4] -> which will now fail for r2 again (if the prev request or prev failed row keys are not cached)
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...loud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
Outdated
Show resolved
Hide resolved
...table/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,125 @@ | |||
/* | |||
* Copyright 2021 Google LLC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this file
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Show resolved
Hide resolved
google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comments
* @param fromStart | ||
* @return | ||
*/ | ||
public static List<RowRange> eraseKeyFromRange(RowRange range, ByteString split, boolean fromStart) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have done. testing.
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Show resolved
Hide resolved
...in/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java
Show resolved
Hide resolved
...igtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java
Show resolved
Hide resolved
...igtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java
Outdated
Show resolved
Hide resolved
...igtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java
Show resolved
Hide resolved
/gcbrun |
🤖 I have created a release *beep* *boop* --- ## [2.53.0](https://togithub.com/googleapis/java-bigtable/compare/v2.52.0...v2.53.0) (2025-02-21) ### Features * Skip large rows ([#2482](https://togithub.com/googleapis/java-bigtable/issues/2482)) ([cd7f82e](https://togithub.com/googleapis/java-bigtable/commit/cd7f82e4b66dc3c34262c73b26afc2fdfd1deed7)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Tasks remaining -
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️
If you write sample code, please follow the samples format.