Skip to content

Commit

Permalink
fix(bigquery): Remove ReadAPI bypass in executeSelect() (#3624)
Browse files Browse the repository at this point in the history
* fix(bigquery): Remove ReadAPI bypass in executeSelect() for fast query requests.

* Enable fast query and read API.

Move readAPI condition check from getExecuteSelectResponse() to
queryRpc(). This allows fast query to be used with ReadAPI.

* Check for null for fast query results.getTotalRows()

* Remove test file.

* Add internal query state to keep track of ReadAPI usage.

A Boolean field is added to keep track of whether or not the high
throughput ReadAPI is used. This is mostly for testing to avoid
another regression in the future.

* Move tests into IT test file

* Fix formatting changes. Again :/

* Add VisibleForTesting Annotation
  • Loading branch information
whuffman36 authored Jan 15, 2025
1 parent 09c98bf commit fadd992
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,22 +476,29 @@ private BigQueryResult queryRpc(
}

// Query finished running and we can paginate all the results
if (results.getJobComplete() && results.getSchema() != null) {
// Results should be read using the high throughput read API if sufficiently large.
boolean resultsLargeEnoughForReadApi =
connectionSettings.getUseReadAPI()
&& results.getTotalRows() != null
&& results.getTotalRows().longValue() > connectionSettings.getMinResultSize();
if (results.getJobComplete() && results.getSchema() != null && !resultsLargeEnoughForReadApi) {
return processQueryResponseResults(results);
} else {
// Query is long-running (> 10s) and hasn't completed yet, or query completed but didn't
// return the schema, fallback to jobs.insert path. Some operations don't return the schema
// and can be optimized here, but this is left as future work.
Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue();
Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size());
// Query is long-running (> 10s) and hasn't completed yet, query completed but didn't
// return the schema, or results are sufficiently large to use the high throughput read API,
// fallback to jobs.insert path. Some operations don't return the schema and can be optimized
// here, but this is left as future work.
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
Long totalRows =
firstPage.getTotalRows() == null ? null : firstPage.getTotalRows().longValue();
Long pageRows = firstPage.getRows() == null ? null : (long) (firstPage.getRows().size());
logger.log(
Level.WARNING,
"\n"
+ String.format(
"results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s",
results.getJobComplete(), results.getSchema() == null, totalRows, pageRows));
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
Expand Down Expand Up @@ -996,6 +1003,7 @@ BigQueryResult highThroughPutRead(
schema);

logger.log(Level.INFO, "\n Using BigQuery Read API");
stats.getQueryStatistics().setUseReadApi(true);
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, bufferRow, stats);

} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.StringEnumType;
import com.google.cloud.StringEnumValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
Expand Down Expand Up @@ -396,6 +397,7 @@ public static class QueryStatistics extends JobStatistics {
private final BiEngineStats biEngineStats;
private final Integer billingTier;
private final Boolean cacheHit;
private Boolean useReadApi;
private final String ddlOperationPerformed;
private final TableId ddlTargetTable;
private final RoutineId ddlTargetRoutine;
Expand Down Expand Up @@ -796,6 +798,7 @@ private QueryStatistics(Builder builder) {
this.biEngineStats = builder.biEngineStats;
this.billingTier = builder.billingTier;
this.cacheHit = builder.cacheHit;
this.useReadApi = false;
this.ddlOperationPerformed = builder.ddlOperationPerformed;
this.ddlTargetTable = builder.ddlTargetTable;
this.ddlTargetRoutine = builder.ddlTargetRoutine;
Expand Down Expand Up @@ -835,6 +838,18 @@ public Boolean getCacheHit() {
return cacheHit;
}

/** Returns whether the query result is read from the high throughput ReadAPI. */
@VisibleForTesting
public Boolean getUseReadApi() {
return useReadApi;
}

/** Sets internal state to reflect the use of the high throughput ReadAPI. */
@VisibleForTesting
public void setUseReadApi(Boolean useReadApi) {
this.useReadApi = useReadApi;
}

/** [BETA] For DDL queries, returns the operation applied to the DDL target table. */
public String getDdlOperationPerformed() {
return ddlOperationPerformed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3489,6 +3489,63 @@ public void testExecuteSelectDefaultConnectionSettings() throws SQLException {
String query = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
BigQueryResult bigQueryResult = connection.executeSelect(query);
assertEquals(42, bigQueryResult.getTotalRows());
assertFalse(bigQueryResult.getBigQueryResultStats().getQueryStatistics().getUseReadApi());
}

@Test
public void testExecuteSelectWithReadApi() throws SQLException {
final int rowLimit = 5000;
final String QUERY =
"SELECT * FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 LIMIT %s";
// Job timeout is somewhat arbitrary - just ensures that fast query is not used.
// min result size and page row count ratio ensure that the ReadAPI is used.
ConnectionSettings connectionSettingsReadAPIEnabledFastQueryDisabled =
ConnectionSettings.newBuilder()
.setUseReadAPI(true)
.setJobTimeoutMs(Long.MAX_VALUE)
.setMinResultSize(500)
.setTotalToPageRowCountRatio(1)
.build();

Connection connectionReadAPIEnabled =
bigquery.createConnection(connectionSettingsReadAPIEnabledFastQueryDisabled);

String selectQuery = String.format(QUERY, rowLimit);

BigQueryResult bigQueryResultSet = connectionReadAPIEnabled.executeSelect(selectQuery);
ResultSet rs = bigQueryResultSet.getResultSet();
// Paginate results to avoid an InterruptedException
while (rs.next()) {}

assertTrue(bigQueryResultSet.getBigQueryResultStats().getQueryStatistics().getUseReadApi());
connectionReadAPIEnabled.close();
}

@Test
public void testExecuteSelectWithFastQueryReadApi() throws SQLException {
final int rowLimit = 5000;
final String QUERY =
"SELECT * FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 LIMIT %s";
// min result size and page row count ratio ensure that the ReadAPI is used.
ConnectionSettings connectionSettingsReadAPIEnabledFastQueryDisabled =
ConnectionSettings.newBuilder()
.setUseReadAPI(true)
.setMinResultSize(500)
.setTotalToPageRowCountRatio(1)
.build();

Connection connectionReadAPIEnabled =
bigquery.createConnection(connectionSettingsReadAPIEnabledFastQueryDisabled);

String selectQuery = String.format(QUERY, rowLimit);

BigQueryResult bigQueryResultSet = connectionReadAPIEnabled.executeSelect(selectQuery);
ResultSet rs = bigQueryResultSet.getResultSet();
// Paginate results to avoid an InterruptedException
while (rs.next()) {}

assertTrue(bigQueryResultSet.getBigQueryResultStats().getQueryStatistics().getUseReadApi());
connectionReadAPIEnabled.close();
}

@Test
Expand Down Expand Up @@ -3540,6 +3597,7 @@ public void testExecuteSelectWithCredentials() throws SQLException {
+ TABLE_ID_LARGE.getTable(); // Large query result is needed to use BigQueryReadClient.
BigQueryResult bigQueryResult = connectionGoodCredentials.executeSelect(query);
assertEquals(313348, bigQueryResult.getTotalRows());
assertTrue(bigQueryResult.getBigQueryResultStats().getQueryStatistics().getUseReadApi());

// Scenario 2.
// Create a new bigQuery object but explicitly an invalid credential.
Expand Down

0 comments on commit fadd992

Please sign in to comment.