Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip large rows #2482

Merged
merged 31 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
81e3f04
feat: skip large rows
sarthakbhutani Feb 13, 2025
b796d8c
merged commit
Feb 4, 2025
0eb2269
feat: skip large rows | wip - added large row skip method - with unit…
sarthakbhutani Feb 13, 2025
bf7c17f
feat: skip large rows | integration tests are working with new AFE be…
sarthakbhutani Feb 13, 2025
3cf3940
feat: skip large rows | formatted code
sarthakbhutani Feb 13, 2025
b29e73b
feat:skip large rows | corrected client.readrows() method
sarthakbhutani Feb 13, 2025
73ca04b
feat: skip large rows | feedback incorp
sarthakbhutani Feb 13, 2025
5b0eced
feat: skip large rows | feedback incorp
sarthakbhutani Feb 13, 2025
192e7e1
feat: skip large rows | feedback incorp & code formatting
sarthakbhutani Feb 13, 2025
76e8775
feat: skip large rows | removed comments
sarthakbhutani Feb 13, 2025
e49b332
Merge branch 'main' into large-row-skip
sarthakbhutani Feb 13, 2025
e2d0224
Merge branch 'main' into large-row-skip
mutianf Feb 13, 2025
e5a6f17
feedback incorp + added more test cases for large row read
sarthakbhutani Feb 17, 2025
82b04e8
deps: update dependency com.google.cloud:sdk-platform-java-config to …
renovate-bot Feb 14, 2025
4fec415
deps: update dependency com.google.cloud:gapic-libraries-bom to v1.52…
renovate-bot Feb 14, 2025
5bd17c3
chore(main): release 2.52.0 (#2479)
release-please[bot] Feb 14, 2025
8bfc016
feat: skip large rows | wip - added large row skip method - with unit…
sarthakbhutani Feb 13, 2025
21fc404
LargeReadRowsResumptionStrategy inherits BigtableStreamResumptionStra…
sarthakbhutani Feb 17, 2025
a562d68
added base64 decoding on metadata of large-read-row error
sarthakbhutani Feb 18, 2025
45f88f9
code formatting done
sarthakbhutani Feb 18, 2025
85274c6
code formatted
sarthakbhutani Feb 18, 2025
66ec4b5
typo fixed
sarthakbhutani Feb 18, 2025
7513c3c
ignored large row read IT for emulator
sarthakbhutani Feb 18, 2025
70a9329
Merge branch 'main' into large-row-skip
sarthakbhutani Feb 18, 2025
e810c36
Merge branch 'main' into large-row-skip
mutianf Feb 18, 2025
4d926e4
feedback incorporation
sarthakbhutani Feb 21, 2025
e656186
feedback incorporation
sarthakbhutani Feb 21, 2025
83ea1a8
feedback incorporation
sarthakbhutani Feb 21, 2025
f5068c8
feedback incorporation
sarthakbhutani Feb 21, 2025
7ed1d83
removed duplicate tests
sarthakbhutani Feb 21, 2025
749fa2a
removed typo
sarthakbhutani Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,53 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
return stub.readRowsCallable();
}

/**
* Streams back the results of the read query & omits large rows. The returned callable object
* allows for customization of api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .range("[START KEY]", "[END KEY]")
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
*
* // Iterator style
* try {
* for(Row row : bigtableDataClient.skipLargeRowsCallable().call(query)) {
* // Do something with row
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<Row> rows = bigtableDataClient.skipLargeRowsCallable().all().call(query);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
@InternalApi("only to be used by Bigtable beam connector")
public ServerStreamingCallable<Query, Row> skipLargeRowsCallable() {
return stub.skipLargeRowsCallable();
}

/**
* Streams back the results of the query. This callable allows for customization of the logical
* representation of a row. It's meant for advanced use cases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,36 @@
public final class RowSetUtil {
private RowSetUtil() {}

/** Removes the {@code #excludePoint} rowkey from the {@code RowSet} */
public static RowSet eraseLargeRow(RowSet rowSet, ByteString excludePoint) {

RowSet.Builder newRowSet = RowSet.newBuilder();

if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) {
// querying range (, excludePoint) and (excludePoint, )
newRowSet.addRowRanges(RowRange.newBuilder().setEndKeyOpen(excludePoint).build());
newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(excludePoint).build());
}

// remove large row key from point reads
rowSet.getRowKeysList().stream()
.filter(k -> !k.equals(excludePoint))
.forEach(newRowSet::addRowKeys);

// Handle ranges
for (RowRange rowRange : rowSet.getRowRangesList()) {
List<RowRange> afterSplit = splitOnLargeRowKey(rowRange, excludePoint);
if (afterSplit != null && !afterSplit.isEmpty()) {
afterSplit.forEach(newRowSet::addRowRanges);
}
}

if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) {
return null;
}
return newRowSet.build();
}

/**
* Removes all the keys and range parts that fall on or before the splitPoint.
*
Expand Down Expand Up @@ -125,6 +155,40 @@ private static RowRange truncateRange(RowRange range, ByteString split, boolean
return newRange.build();
}

/** This method erases the {@code #split} key from the range */
private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString largeRowKey) {
List<RowRange> rowRanges = new ArrayList<>();

ByteString startKey = StartPoint.extract(range).value;
ByteString endKey = EndPoint.extract(range).value;

// if end key is on the left of large row key, don't split
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
rowRanges.add(range);
return rowRanges;
}

// if start key is on the right of the large row key, don't split
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) {
rowRanges.add(range);
return rowRanges;
}

// if start key is on the left of the large row key, set the end key to be large row key open
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) {
RowRange beforeSplit = range.toBuilder().setEndKeyOpen(largeRowKey).build();
rowRanges.add(beforeSplit);
}

// if the end key is on the right of the large row key, set the start key to be large row key
// open
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
RowRange afterSplit = range.toBuilder().setStartKeyOpen(largeRowKey).build();
rowRanges.add(afterSplit);
}
return rowRanges;
}

/**
* Splits the provided {@link RowSet} into segments partitioned by the provided {@code
* splitPoints}. The split points will be treated as start keys of the segments. The primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.LargeReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
Expand Down Expand Up @@ -176,6 +177,9 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final DynamicFlowControlStats bulkMutationDynamicFlowControlStats;

private final ServerStreamingCallable<Query, Row> readRowsCallable;

private final ServerStreamingCallable<Query, Row> skipLargeRowsCallable;

private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
@Deprecated private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
Expand Down Expand Up @@ -304,6 +308,7 @@ public EnhancedBigtableStub(
this.bulkMutationDynamicFlowControlStats = new DynamicFlowControlStats();

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
skipLargeRowsCallable = createSkipLargeRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
Expand Down Expand Up @@ -445,6 +450,7 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
return createReadRowsBaseCallable(
readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy<RowT>(rowAdapter));
}

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
Expand Down Expand Up @@ -515,6 +521,96 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* Creates a callable chain to handle streaming ReadRows RPCs. This chain skips the large rows
* internally. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
* <li>Dispatch the RPC with {@link ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Add bigtable tracer for tracking bigtable specific metrics.
* <li>Retry/resume on failure (retries for retryable error codes, connection errors and skip
* large row keys)
* <li>Filter out marker rows.
* <li>Add tracing & metrics.
* </ul>
*/
private <ReqT, RowT> ServerStreamingCallable<Query, RowT> createSkipLargeRowsCallable(
RowAdapter<RowT> rowAdapter) {

ServerStreamingCallSettings<ReqT, Row> readRowsSettings =
(ServerStreamingCallSettings<ReqT, Row>) settings.readRowsSettings();

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
.setParamsExtractor(
r ->
composeRequestParams(
r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
readRowsSettings.getRetryableCodes());

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ConvertExceptionCallable<>(withStatsHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);

// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner
// ReadRowsRequest -> ReadRowsResponse callable).
// We override the resumption strategy to use LargeReadRowsResumptionStrategy here (which skips
// the large rows) instead of ReadRowResumptionStrategy
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new LargeReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
.setWaitTimeout(readRowsSettings.getWaitTimeout())
.build();

ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
largeRowWithRetries(retrying1, innerSettings);

ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withRetrySettings(readRowsSettings.getRetrySettings()));
}

/**
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
* batcher. The chain will:
Expand Down Expand Up @@ -1282,6 +1378,22 @@ private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withR
return retrying;
}

private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> largeRowWithRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {

// Retrying algorithm in retryingForLargeRows also takes RetryInfo into consideration, so we
// skip the check for settings.getEnableRetryInfo here
ServerStreamingCallable<RequestT, ResponseT> retrying;
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retryingForLargeRows(
innerCallable, serverStreamingCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
}

// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand All @@ -1290,6 +1402,11 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
return readRowsCallable;
}

/** Returns a streaming read rows callable that skips large rows */
public ServerStreamingCallable<Query, Row> skipLargeRowsCallable() {
return skipLargeRowsCallable;
}

/** Return a point read callable */
public UnaryCallable<Query, Row> readRowCallable() {
return readRowCallable;
Expand Down
Loading
Loading