Skip to content

Commit

Permalink
feat: skip large rows (#2482)
Browse files Browse the repository at this point in the history
Tasks remaining -
- [ ] make changes in the read request API which skips for large rows/internally calls readLargeRowsCallable()
- [ ] expose large rows rowkeys in sidechannel/dlq/some other method - which can be exposed to client


Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
- [ ] Rollback plan is reviewed and LGTMed
- [ ] All new data plane features have a completed end to end testing plan

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
sarthakbhutani authored Feb 21, 2025
1 parent 0a1fbb4 commit cd7f82e
Show file tree
Hide file tree
Showing 9 changed files with 1,009 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,53 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
return stub.readRowsCallable();
}

/**
* Streams back the results of the read query & omits large rows. The returned callable object
* allows for customization of api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .range("[START KEY]", "[END KEY]")
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
*
* // Iterator style
* try {
* for(Row row : bigtableDataClient.skipLargeRowsCallable().call(query)) {
* // Do something with row
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<Row> rows = bigtableDataClient.skipLargeRowsCallable().all().call(query);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
@InternalApi("only to be used by Bigtable beam connector")
public ServerStreamingCallable<Query, Row> skipLargeRowsCallable() {
return stub.skipLargeRowsCallable();
}

/**
* Streams back the results of the query. This callable allows for customization of the logical
* representation of a row. It's meant for advanced use cases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,36 @@
public final class RowSetUtil {
private RowSetUtil() {}

/** Removes the {@code #excludePoint} rowkey from the {@code RowSet} */
public static RowSet eraseLargeRow(RowSet rowSet, ByteString excludePoint) {

RowSet.Builder newRowSet = RowSet.newBuilder();

if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) {
// querying range (, excludePoint) and (excludePoint, )
newRowSet.addRowRanges(RowRange.newBuilder().setEndKeyOpen(excludePoint).build());
newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(excludePoint).build());
}

// remove large row key from point reads
rowSet.getRowKeysList().stream()
.filter(k -> !k.equals(excludePoint))
.forEach(newRowSet::addRowKeys);

// Handle ranges
for (RowRange rowRange : rowSet.getRowRangesList()) {
List<RowRange> afterSplit = splitOnLargeRowKey(rowRange, excludePoint);
if (afterSplit != null && !afterSplit.isEmpty()) {
afterSplit.forEach(newRowSet::addRowRanges);
}
}

if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) {
return null;
}
return newRowSet.build();
}

/**
* Removes all the keys and range parts that fall on or before the splitPoint.
*
Expand Down Expand Up @@ -125,6 +155,40 @@ private static RowRange truncateRange(RowRange range, ByteString split, boolean
return newRange.build();
}

/** This method erases the {@code #split} key from the range */
private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString largeRowKey) {
List<RowRange> rowRanges = new ArrayList<>();

ByteString startKey = StartPoint.extract(range).value;
ByteString endKey = EndPoint.extract(range).value;

// if end key is on the left of large row key, don't split
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
rowRanges.add(range);
return rowRanges;
}

// if start key is on the right of the large row key, don't split
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) {
rowRanges.add(range);
return rowRanges;
}

// if start key is on the left of the large row key, set the end key to be large row key open
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) {
RowRange beforeSplit = range.toBuilder().setEndKeyOpen(largeRowKey).build();
rowRanges.add(beforeSplit);
}

// if the end key is on the right of the large row key, set the start key to be large row key
// open
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
RowRange afterSplit = range.toBuilder().setStartKeyOpen(largeRowKey).build();
rowRanges.add(afterSplit);
}
return rowRanges;
}

/**
* Splits the provided {@link RowSet} into segments partitioned by the provided {@code
* splitPoints}. The split points will be treated as start keys of the segments. The primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.LargeReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
Expand Down Expand Up @@ -176,6 +177,9 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final DynamicFlowControlStats bulkMutationDynamicFlowControlStats;

private final ServerStreamingCallable<Query, Row> readRowsCallable;

private final ServerStreamingCallable<Query, Row> skipLargeRowsCallable;

private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
@Deprecated private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
Expand Down Expand Up @@ -304,6 +308,7 @@ public EnhancedBigtableStub(
this.bulkMutationDynamicFlowControlStats = new DynamicFlowControlStats();

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
skipLargeRowsCallable = createSkipLargeRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
Expand Down Expand Up @@ -445,6 +450,7 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
return createReadRowsBaseCallable(
readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy<RowT>(rowAdapter));
}

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
Expand Down Expand Up @@ -515,6 +521,96 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* Creates a callable chain to handle streaming ReadRows RPCs. This chain skips the large rows
* internally. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
* <li>Dispatch the RPC with {@link ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Add bigtable tracer for tracking bigtable specific metrics.
* <li>Retry/resume on failure (retries for retryable error codes, connection errors and skip
* large row keys)
* <li>Filter out marker rows.
* <li>Add tracing & metrics.
* </ul>
*/
private <ReqT, RowT> ServerStreamingCallable<Query, RowT> createSkipLargeRowsCallable(
RowAdapter<RowT> rowAdapter) {

ServerStreamingCallSettings<ReqT, Row> readRowsSettings =
(ServerStreamingCallSettings<ReqT, Row>) settings.readRowsSettings();

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
.setParamsExtractor(
r ->
composeRequestParams(
r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
readRowsSettings.getRetryableCodes());

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ConvertExceptionCallable<>(withStatsHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);

// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner
// ReadRowsRequest -> ReadRowsResponse callable).
// We override the resumption strategy to use LargeReadRowsResumptionStrategy here (which skips
// the large rows) instead of ReadRowResumptionStrategy
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new LargeReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
.setWaitTimeout(readRowsSettings.getWaitTimeout())
.build();

ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
largeRowWithRetries(retrying1, innerSettings);

ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withRetrySettings(readRowsSettings.getRetrySettings()));
}

/**
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
* batcher. The chain will:
Expand Down Expand Up @@ -1282,6 +1378,22 @@ private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withR
return retrying;
}

private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> largeRowWithRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {

// Retrying algorithm in retryingForLargeRows also takes RetryInfo into consideration, so we
// skip the check for settings.getEnableRetryInfo here
ServerStreamingCallable<RequestT, ResponseT> retrying;
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retryingForLargeRows(
innerCallable, serverStreamingCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
}

// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand All @@ -1290,6 +1402,11 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
return readRowsCallable;
}

/** Returns a streaming read rows callable that skips large rows */
public ServerStreamingCallable<Query, Row> skipLargeRowsCallable() {
return skipLargeRowsCallable;
}

/** Return a point read callable */
public UnaryCallable<Query, Row> readRowCallable() {
return readRowCallable;
Expand Down
Loading

0 comments on commit cd7f82e

Please sign in to comment.