Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip large rows #2482

Merged
merged 31 commits into from
Feb 21, 2025
Merged

Conversation

sarthakbhutani
Copy link
Member

Tasks remaining -

  • make changes in the read request API which skips for large rows/internally calls readLargeRowsCallable()
  • expose large rows rowkeys in sidechannel/dlq/some other method - which can be exposed to client

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)
  • Rollback plan is reviewed and LGTMed
  • All new data plane features have a completed end to end testing plan

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the samples format.

@sarthakbhutani sarthakbhutani requested review from a team as code owners February 11, 2025 14:54
Copy link

google-cla bot commented Feb 11, 2025

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@product-auto-label product-auto-label bot added size: xl Pull request size is extra large. api: bigtable Issues related to the googleapis/java-bigtable API. labels Feb 11, 2025
Copy link
Contributor

@mutianf mutianf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at RowSetUtil or any of the tests but left some comments you can work on first.

/**
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
*/
public final class LargeRowReadCallable<RequestT, ResponseT, RowT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're modifying the ServerStreamingAttemptCallable directly we can remove this class.

@Override
protected void onErrorImpl(Throwable t) {
// this has no impact
// if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has no effect because we create a new strategy here: /~https://github.com/googleapis/sdk-platform-java/blob/main/gax-java/gax/src/main/java/com/google/api/gax/rpc/RetryingServerStreamingCallable.java#L80 (which I didn't notice previously).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct!! I believe - we create new resumption strategy, so that parallel requests have their own state tracking via new resumption strategy objects per request

Copy link
Member Author

@sarthakbhutani sarthakbhutani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have resolved your comments - which are implemented in code.

unresolved comments - have not done - require a discussion - can discuss offline?

}
}

public void dumpLargeRowKeys() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method can be implemented later. have added javadoc for the same

@Override
protected void onErrorImpl(Throwable t) {
// this has no impact
// if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct!! I believe - we create new resumption strategy, so that parallel requests have their own state tracking via new resumption strategy objects per request

new RowMergingCallable<>(convertException, rowAdapter);

LargeReadRowsResumptionStrategy<RowT> largeRowResumptionStrategy;
largeRowResumptionStrategy = new LargeReadRowsResumptionStrategy<RowT>(rowAdapter);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing this

@@ -0,0 +1,125 @@
/*
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[duplicate comment]
[can discuss offline]

it is required.
it goes to the convertableExceptionCallable layer, which converts the FailedPreCondition Exception to ApiException & set the exception as ```retryable:true``

which then gets thrown to the ResumptionStrategy layer. Hence, this is required.

I have confirmed this in testing/debugging as well.

Copy link
Member Author

@sarthakbhutani sarthakbhutani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments

@@ -377,6 +382,28 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
.withRetrySettings(settings.readRowsSettings().getRetrySettings()));
}

public <RowT> ServerStreamingCallable<Query, RowT> createSkipLargeRowsBaseCallable(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to create a base callable for this, so you can probably just extract everything from the other base callable and put it in this method. Otherwise they're all called base callables and it gets a bit confusing.

createSkipLargeRowsBaseCallable(
settings.readRowsSettings(),
rowAdapter,
new LargeReadRowsResumptionStrategy<RowT>(rowAdapter));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to pass in the resumption strategy here because it's already overriden in the settings in the other callable (line 580). So this variable is not doing anything.

@@ -221,12 +222,17 @@ public void onResponseImpl(ResponseT response) {

@Override
public void onErrorImpl(Throwable t) {
if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) {
if (resumptionStrategy instanceof LargeReadRowsResumptionStrategy) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be updated.

Copy link
Member Author

@sarthakbhutani sarthakbhutani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have made the changes

remaining =
RowSetUtil.erase(originalRequest.getRows(), lastSuccessKey, !originalRequest.getReversed());
if (!largeRowKeys.isEmpty()) {
for (ByteString largeRowKey : largeRowKeys) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have made the changes as you suggested.
Open question - is there a reason - why we didn't do this earlier. thinking of any edge cases that may fail.

Copy link

Warning: This pull request is touching the following templated files:

  • .kokoro/presubmit/graalvm-native.cfg
  • samples/snapshot/pom.xml

@sarthakbhutani sarthakbhutani force-pushed the large-row-skip branch 3 times, most recently from 19293fa to 3397e51 Compare February 13, 2025 10:08
@mutianf mutianf changed the title Large row skip feat: skip large rows Feb 13, 2025
@@ -221,12 +222,17 @@ public void onResponseImpl(ResponseT response) {

@Override
public void onErrorImpl(Throwable t) {
if (resumptionStrategy.getClass() == LargeReadRowsResumptionStrategy.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be updated.

* @param fromStart
* @return
*/
public static List<RowRange> eraseKeyFromRange(RowRange range, ByteString split, boolean fromStart) {
Copy link
Contributor

@mutianf mutianf Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this logic:

    private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString largeRowKey) {
        List<RowRange> rowRanges = new ArrayList<>();

        ByteString startKey = StartPoint.extract(range).value;
        ByteString endKey = EndPoint.extract(range).value;

        // if end key is on the left of large row key, don't split
        if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
            rowRanges.add(range);
            return rowRanges;
        }

        // if start key is on the right of the large row key, don't split
        if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) {
            rowRanges.add(range);
            return rowRanges;
        }

        // if start key is on the left of the large row key, set the end key to be large row key open
        if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) {
            RowRange beforeSplit = range
                    .toBuilder()
                    .setEndKeyOpen(largeRowKey)
                    .build();
            rowRanges.add(beforeSplit);
        }

        // if the end key is on the right of the large row key, set the start key to be large row key open
        if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
            RowRange afterSplit = range
                    .toBuilder()
                    .setStartKeyOpen(largeRowKey)
                    .build();
            rowRanges.add(afterSplit);
        }
       return rowRanges;
    }

This should cover all edge cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have done. testing.

@@ -49,6 +52,44 @@
public final class RowSetUtil {
private RowSetUtil() {}

public static RowSet createSplitRanges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should also resume from the last seen row key.

Copy link
Contributor

@mutianf mutianf Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want this:

    public static RowSet eraseLargeRow(RowSet rowSet, ByteString lastSeenRowKey, ByteString largeRowKey, boolean fromStart) {
        // first, remove everything we've already read from the RowSet
        RowSet remaining = erase(rowSet, lastSeenRowKey, fromStart);

        // return null if we've read everything
        if (remaining == null) {
            return null;
        }

        // second, remove the large row key from the remaining RowSet
        RowSet.Builder newRowSet = RowSet.newBuilder();

        // remove large row key from point reads
        remaining.getRowKeysList().stream().filter(k -> !k.equals(largeRowKey)).forEach(newRowSet::addRowKeys);

        // remove large row key from row ranges
        for (RowRange range : remaining.getRowRangesList()) {
            List<RowRange> afterSplit = splitOnLargeRowKey(range, largeRowKey);
            if (!afterSplit.isEmpty()) {
                afterSplit.forEach(newRowSet::addRowRanges);
            }
        }
        
        if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) {
            return null;
        }

        return newRowSet.build();
    }

splitOnLargeRowKey is in my other comment.

And in your resumption strategy, you wouldn't need to keep the previous request anymore, because that'll be removed by the RowSet remaining = erase(rowSet, lastSeenRowKey, fromStart);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we needed to keep the prev request, because, for the use case of multiple large row keys together.
if we dont keep the prev request, if the prev request failed because of large-row and the next request also failed because of large-row, in this request, we will remove the 2nd large-row key & the prev one would be there, this would keep failing

ex -
request [r1,r4]
r1 - success key
r2 - large-row key
r3 - large-row key
r4 - large-row key

original request -> [r1,r4]
r1 read, r2 fails -> req becomes (r1,r2),(r2,r4]
r3 fails, req becomes -> (r1,r3), (r3,r4] -> which will now fail for r2 again (if the prev request or prev failed row keys are not cached)

@@ -0,0 +1,125 @@
/*
* Copyright 2021 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this file

Copy link
Member Author

@sarthakbhutani sarthakbhutani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments

* @param fromStart
* @return
*/
public static List<RowRange> eraseKeyFromRange(RowRange range, ByteString split, boolean fromStart) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have done. testing.

@mutianf
Copy link
Contributor

mutianf commented Feb 21, 2025

/gcbrun

@mutianf mutianf added the automerge Merge the pull request once unit tests and other checks pass. label Feb 21, 2025
@gcf-merge-on-green gcf-merge-on-green bot merged commit cd7f82e into googleapis:main Feb 21, 2025
20 checks passed
@gcf-merge-on-green gcf-merge-on-green bot removed the automerge Merge the pull request once unit tests and other checks pass. label Feb 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigtable Issues related to the googleapis/java-bigtable API. size: xl Pull request size is extra large.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants