diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java index 61f51924f1..4deddc5799 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java @@ -1279,6 +1279,53 @@ public ServerStreamingCallable readRowsCallable() { return stub.readRowsCallable(); } + /** + * Streams back the results of the read query & omits large rows. The returned callable object + * allows for customization of api invocation. + * + *

Sample code: + * + *

{@code
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+   *   String tableId = "[TABLE]";
+   *
+   *   Query query = Query.create(tableId)
+   *          .range("[START KEY]", "[END KEY]")
+   *          .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
+   *
+   *   // Iterator style
+   *   try {
+   *     for(Row row : bigtableDataClient.skipLargeRowsCallable().call(query)) {
+   *       // Do something with row
+   *     }
+   *   } catch (NotFoundException e) {
+   *     System.out.println("Tried to read a non-existent table");
+   *   } catch (RuntimeException e) {
+   *     e.printStackTrace();
+   *   }
+   *
+   *   // Sync style
+   *   try {
+   *     List rows = bigtableDataClient.skipLargeRowsCallable().all().call(query);
+   *   } catch (NotFoundException e) {
+   *     System.out.println("Tried to read a non-existent table");
+   *   } catch (RuntimeException e) {
+   *     e.printStackTrace();
+   *   }
+   *
+   *   // etc
+   * }
+   * }
+ * + * @see ServerStreamingCallable For call styles. + * @see Query For query options. + * @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL. + */ + @InternalApi("only to be used by Bigtable beam connector") + public ServerStreamingCallable skipLargeRowsCallable() { + return stub.skipLargeRowsCallable(); + } + /** * Streams back the results of the query. This callable allows for customization of the logical * representation of a row. It's meant for advanced use cases. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java index 68f81cc56f..a0d079e240 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java @@ -49,6 +49,36 @@ public final class RowSetUtil { private RowSetUtil() {} + /** Removes the {@code #excludePoint} rowkey from the {@code RowSet} */ + public static RowSet eraseLargeRow(RowSet rowSet, ByteString excludePoint) { + + RowSet.Builder newRowSet = RowSet.newBuilder(); + + if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) { + // querying range (, excludePoint) and (excludePoint, ) + newRowSet.addRowRanges(RowRange.newBuilder().setEndKeyOpen(excludePoint).build()); + newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(excludePoint).build()); + } + + // remove large row key from point reads + rowSet.getRowKeysList().stream() + .filter(k -> !k.equals(excludePoint)) + .forEach(newRowSet::addRowKeys); + + // Handle ranges + for (RowRange rowRange : rowSet.getRowRangesList()) { + List afterSplit = splitOnLargeRowKey(rowRange, excludePoint); + if (afterSplit != null && !afterSplit.isEmpty()) { + afterSplit.forEach(newRowSet::addRowRanges); + } + } + + if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) { + return null; + } + return newRowSet.build(); + } + /** * Removes all the keys and range parts that fall on or before the splitPoint. * @@ -125,6 +155,40 @@ private static RowRange truncateRange(RowRange range, ByteString split, boolean return newRange.build(); } + /** This method erases the {@code #split} key from the range */ + private static List splitOnLargeRowKey(RowRange range, ByteString largeRowKey) { + List rowRanges = new ArrayList<>(); + + ByteString startKey = StartPoint.extract(range).value; + ByteString endKey = EndPoint.extract(range).value; + + // if end key is on the left of large row key, don't split + if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) { + rowRanges.add(range); + return rowRanges; + } + + // if start key is on the right of the large row key, don't split + if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) { + rowRanges.add(range); + return rowRanges; + } + + // if start key is on the left of the large row key, set the end key to be large row key open + if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) { + RowRange beforeSplit = range.toBuilder().setEndKeyOpen(largeRowKey).build(); + rowRanges.add(beforeSplit); + } + + // if the end key is on the right of the large row key, set the start key to be large row key + // open + if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) { + RowRange afterSplit = range.toBuilder().setStartKeyOpen(largeRowKey).build(); + rowRanges.add(afterSplit); + } + return rowRanges; + } + /** * Splits the provided {@link RowSet} into segments partitioned by the provided {@code * splitPoints}. The split points will be treated as start keys of the segments. The primary diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 46377fbc41..d705989d0b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -110,6 +110,7 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; +import com.google.cloud.bigtable.data.v2.stub.readrows.LargeReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; @@ -176,6 +177,9 @@ public class EnhancedBigtableStub implements AutoCloseable { private final DynamicFlowControlStats bulkMutationDynamicFlowControlStats; private final ServerStreamingCallable readRowsCallable; + + private final ServerStreamingCallable skipLargeRowsCallable; + private final UnaryCallable readRowCallable; private final UnaryCallable> bulkReadRowsCallable; @Deprecated private final UnaryCallable> sampleRowKeysCallable; @@ -304,6 +308,7 @@ public EnhancedBigtableStub( this.bulkMutationDynamicFlowControlStats = new DynamicFlowControlStats(); readRowsCallable = createReadRowsCallable(new DefaultRowAdapter()); + skipLargeRowsCallable = createSkipLargeRowsCallable(new DefaultRowAdapter()); readRowCallable = createReadRowCallable(new DefaultRowAdapter()); bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter()); sampleRowKeysCallable = createSampleRowKeysCallable(); @@ -445,6 +450,7 @@ private ServerStreamingCallable createReadRo return createReadRowsBaseCallable( readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy(rowAdapter)); } + /** * Creates a callable chain to handle ReadRows RPCs. The chain will: * @@ -515,6 +521,96 @@ private ServerStreamingCallable createReadRo return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } + /** + * Creates a callable chain to handle streaming ReadRows RPCs. This chain skips the large rows + * internally. The chain will: + * + *
    + *
  • Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}. + *
  • Dispatch the RPC with {@link ReadRowsRequest}. + *
  • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured in by the {@code rowAdapter} parameter. + *
  • Add bigtable tracer for tracking bigtable specific metrics. + *
  • Retry/resume on failure (retries for retryable error codes, connection errors and skip + * large row keys) + *
  • Filter out marker rows. + *
  • Add tracing & metrics. + *
+ */ + private ServerStreamingCallable createSkipLargeRowsCallable( + RowAdapter rowAdapter) { + + ServerStreamingCallSettings readRowsSettings = + (ServerStreamingCallSettings) settings.readRowsSettings(); + + ServerStreamingCallable base = + GrpcRawCallableFactory.createServerStreamingCallable( + GrpcCallSettings.newBuilder() + .setMethodDescriptor(BigtableGrpc.getReadRowsMethod()) + .setParamsExtractor( + r -> + composeRequestParams( + r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName())) + .build(), + readRowsSettings.getRetryableCodes()); + + ServerStreamingCallable withStatsHeaders = + new StatsHeadersServerStreamingCallable<>(base); + + // Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and + // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code + // which by default is not retryable. Convert the exception so it can be retried in the client. + ServerStreamingCallable convertException = + new ConvertExceptionCallable<>(withStatsHeaders); + + ServerStreamingCallable merging = + new RowMergingCallable<>(convertException, rowAdapter); + + // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner + // ReadRowsRequest -> ReadRowsResponse callable). + // We override the resumption strategy to use LargeReadRowsResumptionStrategy here (which skips + // the large rows) instead of ReadRowResumptionStrategy + ServerStreamingCallSettings innerSettings = + ServerStreamingCallSettings.newBuilder() + .setResumptionStrategy(new LargeReadRowsResumptionStrategy<>(rowAdapter)) + .setRetryableCodes(readRowsSettings.getRetryableCodes()) + .setRetrySettings(readRowsSettings.getRetrySettings()) + .setIdleTimeout(readRowsSettings.getIdleTimeout()) + .setWaitTimeout(readRowsSettings.getWaitTimeout()) + .build(); + + ServerStreamingCallable watched = + Callables.watched(merging, innerSettings, clientContext); + + ServerStreamingCallable withBigtableTracer = + new BigtableTracerStreamingCallable<>(watched); + + // Retry logic is split into 2 parts to workaround a rare edge case described in + // ReadRowsRetryCompletedCallable + ServerStreamingCallable retrying1 = + new ReadRowsRetryCompletedCallable<>(withBigtableTracer); + + ServerStreamingCallable retrying2 = + largeRowWithRetries(retrying1, innerSettings); + + ServerStreamingCallable readRowsCallable = + new FilterMarkerRowsCallable<>(retrying2, rowAdapter); + + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + + SpanName span = getSpanName("ReadRows"); + ServerStreamingCallable traced = + new TracedServerStreamingCallable<>( + readRowsUserCallable, clientContext.getTracerFactory(), span); + + return traced.withDefaultCallContext( + clientContext + .getDefaultCallContext() + .withRetrySettings(readRowsSettings.getRetrySettings())); + } + /** * Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows * batcher. The chain will: @@ -1282,6 +1378,22 @@ private ServerStreamingCallable withR return retrying; } + private ServerStreamingCallable largeRowWithRetries( + ServerStreamingCallable innerCallable, + ServerStreamingCallSettings serverStreamingCallSettings) { + + // Retrying algorithm in retryingForLargeRows also takes RetryInfo into consideration, so we + // skip the check for settings.getEnableRetryInfo here + ServerStreamingCallable retrying; + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retryingForLargeRows( + innerCallable, serverStreamingCallSettings, clientContext); + if (settings.getEnableRoutingCookie()) { + return new CookiesServerStreamingCallable<>(retrying); + } + return retrying; + } + // // @@ -1290,6 +1402,11 @@ public ServerStreamingCallable readRowsCallable() { return readRowsCallable; } + /** Returns a streaming read rows callable that skips large rows */ + public ServerStreamingCallable skipLargeRowsCallable() { + return skipLargeRowsCallable; + } + /** Return a point read callable */ public UnaryCallable readRowCallable() { return readRowCallable; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java new file mode 100644 index 0000000000..90f5958dc3 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java @@ -0,0 +1,168 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiException; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsRequest.Builder; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.data.v2.internal.RowSetUtil; +import com.google.cloud.bigtable.data.v2.models.RowAdapter; +import com.google.cloud.bigtable.data.v2.stub.BigtableStreamResumptionStrategy; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import java.util.Base64; +import java.util.logging.Logger; + +/** + * An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks - + * + *
    + *
  • row key for the last row that was read successfully + *
  • row key for large-row that couldn't be read + *
  • list of all row keys for large-rows + *
+ * + * Upon retry this class builds a request to omit the large rows & retry from the last row key that + * was successfully read. + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class LargeReadRowsResumptionStrategy + extends BigtableStreamResumptionStrategy { + private static final Logger LOGGER = + Logger.getLogger(LargeReadRowsResumptionStrategy.class.getName()); + private final RowAdapter rowAdapter; + private ByteString lastSuccessKey = ByteString.EMPTY; + // Number of rows processed excluding Marker row. + private long numProcessed; + private ByteString largeRowKey = ByteString.EMPTY; + // we modify the original request in the resumption strategy regardless of how many times it has + // failed, {@code previousFailedRequestRowset} is stored for the use case of continuous large rows + // row-keys + private RowSet previousFailedRequestRowset = null; + + public LargeReadRowsResumptionStrategy(RowAdapter rowAdapter) { + this.rowAdapter = rowAdapter; + } + + @Override + public boolean canResume() { + return true; + } + + @Override + public StreamResumptionStrategy createNew() { + return new LargeReadRowsResumptionStrategy<>(rowAdapter); + } + + @Override + public RowT processResponse(RowT response) { + // Last key can come from both the last processed row key and a synthetic row marker. The + // synthetic row marker is emitted when the server has read a lot of data that was filtered out. + // The row marker can be used to trim the start of the scan, but does not contribute to the row + // limit. + lastSuccessKey = rowAdapter.getKey(response); + + if (!rowAdapter.isScanMarkerRow(response)) { + // Only real rows count towards the rows limit. + numProcessed++; + } + return response; + } + + public Throwable processError(Throwable throwable) { + ByteString rowKeyExtracted = extractLargeRowKey(throwable); + if (rowKeyExtracted != null) { + LOGGER.warning("skipping large row " + rowKeyExtracted); + this.largeRowKey = rowKeyExtracted; + numProcessed = numProcessed + 1; + } + return throwable; + } + + private ByteString extractLargeRowKey(Throwable t) { + if (t instanceof ApiException + && ((ApiException) t).getReason() != null + && ((ApiException) t).getReason().equals("LargeRowReadError")) { + String rowKey = ((ApiException) t).getMetadata().get("rowKeyBase64Encoded"); + + byte[] decodedBytes = Base64.getDecoder().decode(rowKey); + return ByteString.copyFrom(decodedBytes); + } + return null; + } + + /** + * {@inheritDoc} + * + *

This returns an updated request excluding all the rows keys & ranges till (including) {@link + * #lastSuccessKey} & also excludes the last encountered large row key ({@link #largeRowKey}). + * Also, this implementation takes care to update the row limit of the request to account for all + * of the received rows. + */ + @Override + public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) { + + // An empty lastSuccessKey means that we have not successfully read the first row, + // so resume with the original request object. + if (lastSuccessKey.isEmpty() && largeRowKey.isEmpty()) { + return originalRequest; + } + + RowSet remaining; + if (previousFailedRequestRowset == null) { + remaining = originalRequest.getRows(); + } else { + remaining = previousFailedRequestRowset; + } + + if (!lastSuccessKey.isEmpty()) { + remaining = RowSetUtil.erase(remaining, lastSuccessKey, !originalRequest.getReversed()); + } + if (!largeRowKey.isEmpty()) { + remaining = RowSetUtil.eraseLargeRow(remaining, largeRowKey); + } + this.largeRowKey = ByteString.EMPTY; + + previousFailedRequestRowset = remaining; + + // Edge case: retrying a fulfilled request. + // A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it + // had a row limit, has seen enough rows. These requests are replaced with a marker request that + // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable + // for more details. + if (remaining == null + || (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) { + return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER; + } + + Builder builder = originalRequest.toBuilder().setRows(remaining); + + if (originalRequest.getRowsLimit() > 0) { + Preconditions.checkState( + originalRequest.getRowsLimit() > numProcessed, + "Processed rows and number of large rows should not exceed the row limit in the original request"); + builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed); + } + + return builder.build(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java index a78e7643b0..3d696213a6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java @@ -73,4 +73,24 @@ public static ServerStreamingCallable return new RetryingServerStreamingCallable<>( innerCallable, retryingExecutor, settings.getResumptionStrategy()); } + + public static + ServerStreamingCallable retryingForLargeRows( + ServerStreamingCallable innerCallable, + ServerStreamingCallSettings callSettings, + ClientContext clientContext) { + + ServerStreamingCallSettings settings = callSettings; + + StreamingRetryAlgorithm retryAlgorithm = + new StreamingRetryAlgorithm<>( + new LargeRowRetryAlgorithm<>(), + new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock())); + + ScheduledRetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); + + return new RetryingServerStreamingCallable<>( + innerCallable, retryingExecutor, settings.getResumptionStrategy()); + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java new file mode 100644 index 0000000000..ef4e8a5b45 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/LargeRowRetryAlgorithm.java @@ -0,0 +1,112 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; +import com.google.api.gax.retrying.RetryingContext; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import com.google.protobuf.util.Durations; +import com.google.rpc.RetryInfo; +import javax.annotation.Nullable; + +/** + * This retry algorithm checks the metadata of an exception for additional error details. It also + * allows to retry for {@link com.google.api.gax.rpc.FailedPreconditionException} with + * ErrorDetails.Reason as "LargeRowReadError" (for large rows). If the metadata has a RetryInfo + * field, use the retry delay to set the wait time between attempts. + */ +@InternalApi +public class LargeRowRetryAlgorithm extends BasicResultRetryAlgorithm { + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { + java.time.Duration retryDelay = extractRetryDelay(prevThrowable); + if (retryDelay != null) { + return prevSettings + .toBuilder() + .setRandomizedRetryDelayDuration(retryDelay) + .setAttemptCount(prevSettings.getAttemptCount() + 1) + .setOverallAttemptCount(prevSettings.getAttemptCount() + 1) + .build(); + } + return null; + } + + /** Returns true if previousThrowable is an {@link ApiException} that is retryable. */ + @Override + public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) { + return shouldRetry(null, previousThrowable, previousResponse); + } + + /** + * If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of + * previousThrowable is in the list of retryable code of the {@link RetryingContext}. + * + *

Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}. + */ + @Override + public boolean shouldRetry( + @Nullable RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) { + if (extractRetryDelay(previousThrowable) != null) { + // First check if server wants us to retry + return true; + } + if (isLargeRowException(previousThrowable)) { + return true; + } + if (context != null && context.getRetryableCodes() != null) { + // Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list + // of codes that should be retried. + return ((previousThrowable instanceof ApiException) + && context + .getRetryableCodes() + .contains(((ApiException) previousThrowable).getStatusCode().getCode())); + } + + // Server didn't have retry information and there's no retry context, use the local status + // code config. + return previousThrowable instanceof ApiException + && ((ApiException) previousThrowable).isRetryable(); + } + + public boolean isLargeRowException(Throwable previousThrowable) { + return (previousThrowable != null) + && (previousThrowable instanceof ApiException) + && ((ApiException) previousThrowable).getReason() != null + && ((ApiException) previousThrowable).getReason().equals("LargeRowReadError"); + } + + static java.time.Duration extractRetryDelay(@Nullable Throwable throwable) { + if (throwable == null) { + return null; + } + if (!(throwable instanceof ApiException)) { + return null; + } + ApiException exception = (ApiException) throwable; + if (exception.getErrorDetails() == null) { + return null; + } + if (exception.getErrorDetails().getRetryInfo() == null) { + return null; + } + RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo(); + return java.time.Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay())); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java index 7ec29f8b77..7f5c39ec0a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java @@ -345,7 +345,6 @@ private void onAttemptError(Throwable throwable) { synchronized (lock) { localCancellationCause = cancellationCause; } - if (resumptionStrategy instanceof BigtableStreamResumptionStrategy) { throwable = ((BigtableStreamResumptionStrategy) resumptionStrategy).processError(throwable); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java index 4ccf9167f4..50abc2bcde 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/LargeRowIT.java @@ -16,27 +16,68 @@ package com.google.cloud.bigtable.data.v2.it; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.TruthJUnit.assume; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; +import com.google.cloud.bigtable.admin.v2.models.Table; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowCell; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.data.v2.models.TableId; +import com.google.cloud.bigtable.test_helpers.env.CloudEnv; +import com.google.cloud.bigtable.test_helpers.env.PrefixGenerator; import com.google.cloud.bigtable.test_helpers.env.TestEnvRule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; +import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class LargeRowIT { + private static final Logger logger = Logger.getLogger(LargeRowIT.class.getName()); @ClassRule public static final TestEnvRule testEnvRule = new TestEnvRule(); + private BigtableTableAdminClient tableAdminClient; + private Table table; + private String familyId = "cf"; + + @Before + public void setup() { + tableAdminClient = testEnvRule.env().getTableAdminClient(); + String tableId = PrefixGenerator.newPrefix("LargeRowTest"); + table = tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(familyId)); + } + + @After + public void tearDown() { + if (table != null) { + tableAdminClient.deleteTable(table.getId()); + } + } + @Test public void testWriteRead() throws Exception { String rowKey = UUID.randomUUID().toString(); @@ -73,4 +114,169 @@ public void testWriteRead() throws Exception { assertThat(row.getCells().get(0).getValue()).isEqualTo(largeValue); assertThat(row.getCells().get(1).getValue()).isEqualTo(largeValue); } + + static class AccumulatingObserver implements ResponseObserver { + + final List responses = Lists.newArrayList(); + final SettableApiFuture completionFuture = SettableApiFuture.create(); + + void awaitCompletion() throws Throwable { + try { + completionFuture.get(10, TimeUnit.MINUTES); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Override + public void onStart(StreamController controller) {} + + @Override + public void onResponse(Row row) { + responses.add(row); + } + + @Override + public void onError(Throwable t) { + completionFuture.setException(t); + } + + @Override + public void onComplete() { + completionFuture.set(null); + } + } + + // TODO: remove the ignore annotation once the server code (large row error with metadata) is + // released on prod + @Test + @Ignore("large-row-error with metadata yet to be released on prod") + public void read() throws Throwable { + assume() + .withMessage("Large row read errors are not supported by emulator") + .that(testEnvRule.env()) + .isInstanceOf(CloudEnv.class); + + BigtableDataClient client = testEnvRule.env().getDataClient(); + String tableId = table.getId(); + String familyId = this.familyId; + long timestampMicros = System.currentTimeMillis() * 1_000; + + // small row creations + client.bulkMutateRows( + BulkMutation.create(tableId) + .add( + RowMutationEntry.create("r1") + .setCell(familyId, "qualifier", timestampMicros, "my-value")) + .add( + RowMutationEntry.create("r4") + .setCell(familyId, "qualifier", timestampMicros, "my-value")) + .add( + RowMutationEntry.create("r5") + .setCell(familyId, "qualifier", timestampMicros, "my-value")) + .add( + RowMutationEntry.create("r6") + .setCell(familyId, "qualifier", timestampMicros, "my-value"))); + + Row expectedRow1 = + Row.create( + ByteString.copyFromUtf8("r1"), + ImmutableList.of( + RowCell.create( + familyId, + ByteString.copyFromUtf8("qualifier"), + timestampMicros, + ImmutableList.of(), + ByteString.copyFromUtf8("my-value")))); + + Row expectedRow4 = + Row.create( + ByteString.copyFromUtf8("r4"), + ImmutableList.of( + RowCell.create( + familyId, + ByteString.copyFromUtf8("qualifier"), + timestampMicros, + ImmutableList.of(), + ByteString.copyFromUtf8("my-value")))); + + // large row creation + byte[] largeValueBytes = new byte[3 * 1024 * 1024]; + ByteString largeValue = ByteString.copyFrom(largeValueBytes); + + for (int i = 0; i < 100; i++) { + ByteString qualifier = ByteString.copyFromUtf8("qualifier1_" + "_" + String.valueOf(i)); + client.mutateRow( + RowMutation.create(TableId.of(tableId), "r2").setCell(familyId, qualifier, largeValue)); + client.mutateRow( + RowMutation.create(TableId.of(tableId), "r3").setCell(familyId, qualifier, largeValue)); + } + + // sync + assertThat( + client + .skipLargeRowsCallable() + .all() + .call( + Query.create(tableId) + .range(ByteStringRange.unbounded().startClosed("r1").endOpen("r3")))) + .containsExactly(expectedRow1); + + assertThat( + client + .skipLargeRowsCallable() + .all() + .call( + Query.create(tableId) + .range(ByteStringRange.unbounded().startClosed("r1").endClosed("r4")))) + .containsExactly(expectedRow1, expectedRow4); + + List emptyRows = + client + .skipLargeRowsCallable() + .all() + .call( + Query.create(tableId) + .range(ByteStringRange.unbounded().startClosed("r2").endClosed("r3"))); + assertThat(emptyRows).isEmpty(); + + List startWithFaultyRow = + client + .skipLargeRowsCallable() + .all() + .call( + Query.create(tableId) + .range(ByteStringRange.unbounded().startClosed("r2").endClosed("r4"))); + assertThat(startWithFaultyRow).containsExactly(expectedRow4); + + List endsWithFaultyRow = + client + .skipLargeRowsCallable() + .all() + .call( + Query.create(tableId) + .range(ByteStringRange.unbounded().startClosed("r1").endClosed("r3"))); + assertThat(endsWithFaultyRow).containsExactly(expectedRow1); + + assertThat( + client + .skipLargeRowsCallable() + .all() + .call( + Query.create(tableId) + .range(ByteStringRange.unbounded().startClosed("r1").endClosed("r4")))) + .containsExactly(expectedRow1, expectedRow4); + // async + AccumulatingObserver observer = new AccumulatingObserver(); + Query query = Query.create(tableId).range("r1", "r3"); + client.skipLargeRowsCallable().call(query, observer); + observer.awaitCompletion(); + assertThat(observer.responses).containsExactly(expectedRow1); + + AccumulatingObserver observer2 = new AccumulatingObserver(); + Query query2 = Query.create(tableId).range("r1", "r5"); + client.skipLargeRowsCallable().call(query2, observer2); + observer2.awaitCompletion(); + assertThat(observer2.responses).containsExactly(expectedRow1, expectedRow4); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java index 8a8c6d7709..3ff77a3f5d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -19,35 +19,45 @@ import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.UnavailableException; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.ReadRowsResponse.CellChunk; import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.truth.Truth; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; +import com.google.rpc.ErrorInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; import java.io.IOException; +import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -60,6 +70,8 @@ public class ReadRowsRetryTest { private static final String PROJECT_ID = "fake-project"; private static final String INSTANCE_ID = "fake-instance"; private static final String TABLE_ID = "fake-table"; + private static final Metadata.Key ERROR_DETAILS_KEY = + Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER); @Rule public GrpcServerRule serverRule = new GrpcServerRule(); private TestBigtableService service; @@ -124,6 +136,203 @@ public void immediateRetryTest() { Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder(); } + public ApiException createLargeRowException(String rowKey) { + ErrorInfo errorInfo = + ErrorInfo.newBuilder() + .setReason("LargeRowReadError") + .setDomain("bigtable.googleapis.com") + .putMetadata( + "rowKeyBase64Encoded", Base64.getEncoder().encodeToString(rowKey.getBytes())) + .build(); + + Any packedErrorInfo = Any.pack(errorInfo); + + ErrorDetails errorDetails = + ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(packedErrorInfo)).build(); + + Metadata trailers = new Metadata(); + byte[] status = + com.google.rpc.Status.newBuilder().addDetails(Any.pack(errorInfo)).build().toByteArray(); + trailers.put(ERROR_DETAILS_KEY, status); + return (new UnavailableException( + new StatusRuntimeException(Status.FAILED_PRECONDITION, trailers), + GrpcStatusCode.of(Code.FAILED_PRECONDITION), + false, + errorDetails)); + } + + @Test + public void readRowsWithLimitSkippingLargeRowsTest() { + // Large rows is r2 for range r1 to r8 + ApiException largeRowExceptionWithTrailersR2 = createLargeRowException("r2"); + + List> rangeList; + List actualResults; + + // TEST - range end is large row || row limit + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRowLimit(2) + .respondWith("r1") + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2)); + + actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r1", "r3").limit(2)); + Truth.assertThat(actualResults).containsExactly("r1").inOrder(); + + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r4", "r7")) + .expectRowLimit(2) + .respondWith("r4", "r5")); + + actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r4", "r7").limit(2)); + Truth.assertThat(actualResults).containsExactly("r4", "r5").inOrder(); + } + + @Test + public void readRowsForRowKeyWithLargeRowsTest() { + // Large rows is r2 for range r1 to r8 + ApiException largeRowExceptionWithTrailersR7 = createLargeRowException("r7"); + List actualResults; + + service.expectations.add( + RpcExpectation.create() + .expectRequest("r1", "r7", "r4", "r8") + .respondWith("r1", "r4") + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR7)); + service.expectations.add(RpcExpectation.create().expectRequest("r8").respondWith("r8")); + + actualResults = + getSkipLargeRowsResults( + Query.create(TABLE_ID).rowKey("r1").rowKey("r7").rowKey("r4").rowKey("r8")); + Truth.assertThat(actualResults).containsExactly("r1", "r4", "r8").inOrder(); + } + + /** + * This tests if in a read rows request RowRange includes large rows, those rows are omitted in + * the response. + */ + @Test + public void readRowRangeWithSkippingLargeRows() { + + // Large rows are r2, r3,r7 from r1 to r8 + ApiException largeRowExceptionWithTrailersR2 = createLargeRowException("r2"); + ApiException largeRowExceptionWithTrailersR3 = createLargeRowException("r3"); + ApiException largeRowExceptionWithTrailersR7 = createLargeRowException("r7"); + + List> rangeList; + List actualResults; + + // TEST - only query for large rows - should receive an empty response + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r2", "r4")) + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r2", "r4")) + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3)); + + rangeList = new ArrayList>(); + rangeList.add(Range.open("r2", "r3")); + rangeList.add(Range.open("r3", "r4")); + service.expectations.add( + RpcExpectation.create() + .expectRequestForMultipleRowRanges(rangeList) + .respondWithStatus(Code.OK)); + + actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r2", "r4")); + Truth.assertThat(actualResults.size()).isEqualTo(0); + + // TEST - range start is large row + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r3", "r5")) + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3)); + + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r3", "r5")).respondWith("r4")); + + actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r3", "r5")); + Truth.assertThat(actualResults).containsExactly("r4").inOrder(); + + // TEST - range end is large row + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWith("r1") + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2)); + + rangeList = new ArrayList>(); + rangeList.add(Range.open("r1", "r2")); + rangeList.add(Range.open("r2", "r3")); + service.expectations.add( + RpcExpectation.create() + .expectRequestForMultipleRowRanges(rangeList) + .respondWithStatus(Code.OK)); + + actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r1", "r3")); + Truth.assertThat(actualResults).containsExactly("r1").inOrder(); + + // r2 faulty + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r9")) + .respondWith("r1") + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR2)); + + // r3 faulty + rangeList = new ArrayList>(); + rangeList.add(Range.open("r1", "r2")); + rangeList.add(Range.open("r2", "r9")); + service.expectations.add( + RpcExpectation.create() + .expectRequestForMultipleRowRanges(rangeList) + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3)); + + rangeList = new ArrayList>(); + rangeList.add(Range.open("r1", "r2")); + rangeList.add(Range.open("r2", "r3")); + rangeList.add(Range.open("r3", "r9")); + service.expectations.add( + RpcExpectation.create() + .expectRequestForMultipleRowRanges(rangeList) + .respondWith("r4", "r5") + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR7)); + + rangeList = new ArrayList>(); + rangeList.add(Range.open("r5", "r7")); + rangeList.add(Range.open("r7", "r9")); + + service.expectations.add( + RpcExpectation.create() + .expectRequestForMultipleRowRanges(rangeList) + .respondWith("r6", "r8")); + + actualResults = getSkipLargeRowsResults(Query.create(TABLE_ID).range("r1", "r9")); + Truth.assertThat(actualResults).containsExactly("r1", "r4", "r5", "r6", "r8").inOrder(); + + // TEST - reverse query with large rows + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r3", "r7")) + .setReversed(true) + .respondWith("r6", "r5", "r4") + .respondWithException(Code.INTERNAL, largeRowExceptionWithTrailersR3)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r3", "r4")) + .setReversed(true) + .respondWithStatus(Code.OK)); + + actualResults = + getSkipLargeRowsResults(Query.create(TABLE_ID).range("r3", "r7").reversed(true)); + Truth.assertThat(actualResults).containsExactly("r6", "r5", "r4").inOrder(); + } + @Test public void multipleRetryTest() { service.expectations.add( @@ -299,6 +508,15 @@ private List getResults(Query query) { return actualValues; } + private List getSkipLargeRowsResults(Query query) { + List actualRowKeys = + client.skipLargeRowsCallable().all().call(query).stream() + .map(row -> row.getKey().toStringUtf8()) + .collect(Collectors.toList()); + + return actualRowKeys; + } + private static class TestBigtableService extends BigtableGrpc.BigtableImplBase { Queue expectations = new LinkedBlockingDeque<>(); int i = -1; @@ -336,6 +554,11 @@ private static class RpcExpectation { ApiException exception; List responses; + private RpcExpectation setReversed(boolean reverse) { + this.requestBuilder.setReversed(reverse); + return this; + } + private RpcExpectation() { this.requestBuilder = ReadRowsRequest.newBuilder() @@ -355,6 +578,58 @@ RpcExpectation expectRequest(String... keys) { return this; } + RpcExpectation expectRequestForMultipleRowRanges(List> rowRanges) { + RowSet.Builder rowRange = requestBuilder.getRowsBuilder(); + for (Range range : rowRanges) { + rowRangeBuilder(range); + } + return this; + } + + /** + * Build Row Range + * + * @param range + * @return + */ + RowRange rowRangeBuilder(Range range) { + + RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder(); + + if (range.hasLowerBound()) { + switch (range.lowerBoundType()) { + case CLOSED: + rowRange.setStartKeyClosed(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + case OPEN: + rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + default: + throw new IllegalArgumentException( + "Unexpected lowerBoundType: " + range.lowerBoundType()); + } + } else { + rowRange.clearStartKey(); + } + + if (range.hasUpperBound()) { + switch (range.upperBoundType()) { + case CLOSED: + rowRange.setEndKeyClosed(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + case OPEN: + rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + default: + throw new IllegalArgumentException( + "Unexpected upperBoundType: " + range.upperBoundType()); + } + } else { + rowRange.clearEndKey(); + } + return rowRange.build(); + } + RpcExpectation expectRequest(Range range) { RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder();