Skip to content

Commit

Permalink
Add retry options to BigQuery (#431)
Browse files Browse the repository at this point in the history
* Add retry options to BigQuery

Add two fields to job properties and use them to set retry options in all BigQuery job waitFor calls, as per Google example.

* Change timeout defaults and add config to e2e test to fix failure.
  • Loading branch information
Yanson authored and feast-ci-bot committed Jan 20, 2020
1 parent ba1c828 commit fe520a9
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 12 deletions.
2 changes: 2 additions & 0 deletions .prow/scripts/test-end-to-end-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ feast:
jobs:
staging-location: ${JOBS_STAGING_LOCATION}
store-type: REDIS
bigquery-initial-retry-delay-secs: 1
bigquery-total-timeout-secs: 900
store-options:
host: localhost
port: 6379
Expand Down
18 changes: 18 additions & 0 deletions serving/src/main/java/feast/serving/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,23 @@ public void setRedisPoolMaxIdle(int redisPoolMaxIdle) {

public static class JobProperties {
private String stagingLocation;
private int bigqueryInitialRetryDelaySecs;
private int bigqueryTotalTimeoutSecs;
private String storeType;
private Map<String, String> storeOptions;

public String getStagingLocation() {
return this.stagingLocation;
}

public int getBigqueryInitialRetryDelaySecs() {
return bigqueryInitialRetryDelaySecs;
}

public int getBigqueryTotalTimeoutSecs() {
return bigqueryTotalTimeoutSecs;
}

public String getStoreType() {
return this.storeType;
}
Expand All @@ -132,6 +142,14 @@ public void setStagingLocation(String stagingLocation) {
this.stagingLocation = stagingLocation;
}

public void setBigqueryInitialRetryDelaySecs(int bigqueryInitialRetryDelaySecs) {
this.bigqueryInitialRetryDelaySecs = bigqueryInitialRetryDelaySecs;
}

public void setBigqueryTotalTimeoutSecs(int bigqueryTotalTimeoutSecs) {
this.bigqueryTotalTimeoutSecs = bigqueryTotalTimeoutSecs;
}

public void setStoreType(String storeType) {
this.storeType = storeType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public ServingService servingService(
specService,
jobService,
jobStagingLocation,
feastProperties.getJobs().getBigqueryInitialRetryDelaySecs(),
feastProperties.getJobs().getBigqueryTotalTimeoutSecs(),
storage);
break;
case CASSANDRA:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static feast.serving.store.bigquery.QueryTemplater.generateFullTableName;
import static feast.serving.util.Metrics.requestLatency;

import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
Expand Down Expand Up @@ -57,12 +58,12 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.threeten.bp.Duration;

public class BigQueryServingService implements ServingService {

public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.standardDays(1).getMillis();
public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.ofDays(1).toMillis();
private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryServingService.class);

private final BigQuery bigquery;
Expand All @@ -71,6 +72,8 @@ public class BigQueryServingService implements ServingService {
private final CachedSpecService specService;
private final JobService jobService;
private final String jobStagingLocation;
private final int initialRetryDelaySecs;
private final int totalTimeoutSecs;
private final Storage storage;

public BigQueryServingService(
Expand All @@ -80,13 +83,17 @@ public BigQueryServingService(
CachedSpecService specService,
JobService jobService,
String jobStagingLocation,
int initialRetryDelaySecs,
int totalTimeoutSecs,
Storage storage) {
this.bigquery = bigquery;
this.projectId = projectId;
this.datasetId = datasetId;
this.specService = specService;
this.jobService = jobService;
this.jobStagingLocation = jobStagingLocation;
this.initialRetryDelaySecs = initialRetryDelaySecs;
this.totalTimeoutSecs = totalTimeoutSecs;
this.storage = storage;
}

Expand Down Expand Up @@ -156,6 +163,8 @@ public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeat
.setEntityTableColumnNames(entityNames)
.setFeatureSetInfos(featureSetInfos)
.setJobStagingLocation(jobStagingLocation)
.setInitialRetryDelaySecs(initialRetryDelaySecs)
.setTotalTimeoutSecs(totalTimeoutSecs)
.build())
.start();

Expand Down Expand Up @@ -199,7 +208,7 @@ private Table loadEntities(DatasetSource datasetSource) {
loadJobConfiguration =
loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build();
Job job = bigquery.create(JobInfo.of(loadJobConfiguration));
job.waitFor();
waitForJob(job);

TableInfo expiry =
bigquery
Expand Down Expand Up @@ -239,15 +248,15 @@ private TableId generateUUIDs(Table loadedEntityTable) {
.setDestinationTable(TableId.of(projectId, datasetId, createTempTableName()))
.build();
Job queryJob = bigquery.create(JobInfo.of(queryJobConfig));
queryJob.waitFor();
Job completedJob = waitForJob(queryJob);
TableInfo expiry =
bigquery
.getTable(queryJobConfig.getDestinationTable())
.toBuilder()
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery.update(expiry);
queryJobConfig = queryJob.getConfiguration();
queryJobConfig = completedJob.getConfiguration();
return queryJobConfig.getDestinationTable();
} catch (InterruptedException | BigQueryException e) {
throw Status.INTERNAL
Expand All @@ -257,6 +266,22 @@ private TableId generateUUIDs(Table loadedEntityTable) {
}
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
.asRuntimeException();
}
return completedJob;
}

public static String createTempTableName() {
return "_" + UUID.randomUUID().toString().replace("-", "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static feast.serving.store.bigquery.QueryTemplater.createTimestampLimitQuery;

import com.google.auto.value.AutoValue;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.threeten.bp.Duration;

/**
* BatchRetrievalQueryRunnable is a Runnable for running a BigQuery Feast batch retrieval job async.
Expand Down Expand Up @@ -96,6 +98,10 @@ public abstract class BatchRetrievalQueryRunnable implements Runnable {

public abstract String jobStagingLocation();

public abstract int initialRetryDelaySecs();

public abstract int totalTimeoutSecs();

public abstract Storage storage();

public static Builder builder() {
Expand All @@ -122,6 +128,10 @@ public abstract static class Builder {

public abstract Builder setJobStagingLocation(String jobStagingLocation);

public abstract Builder setInitialRetryDelaySecs(int initialRetryDelaySecs);

public abstract Builder setTotalTimeoutSecs(int totalTimeoutSecs);

public abstract Builder setStorage(Storage storage);

public abstract BatchRetrievalQueryRunnable build();
Expand Down Expand Up @@ -151,7 +161,7 @@ public void run() {
ExtractJobConfiguration.of(
queryConfig.getDestinationTable(), exportTableDestinationUri, "Avro");
Job extractJob = bigquery().create(JobInfo.of(extractConfig));
extractJob.waitFor();
waitForJob(extractJob);
} catch (BigQueryException | InterruptedException | IOException e) {
jobService()
.upsert(
Expand Down Expand Up @@ -200,7 +210,6 @@ private List<String> parseOutputFileURIs() {

Job runBatchQuery(List<String> featureSetQueries)
throws BigQueryException, InterruptedException, IOException {
Job queryJob;
ExecutorService executorService = Executors.newFixedThreadPool(featureSetQueries.size());
ExecutorCompletionService<FeatureSetInfo> executorCompletionService =
new ExecutorCompletionService<>(executorService);
Expand Down Expand Up @@ -257,8 +266,8 @@ Job runBatchQuery(List<String> featureSetQueries)
QueryJobConfiguration.newBuilder(joinQuery)
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
.build();
queryJob = bigquery().create(JobInfo.of(queryJobConfig));
queryJob.waitFor();
Job queryJob = bigquery().create(JobInfo.of(queryJobConfig));
Job completedQueryJob = waitForJob(queryJob);

TableInfo expiry =
bigquery()
Expand All @@ -268,7 +277,7 @@ Job runBatchQuery(List<String> featureSetQueries)
.build();
bigquery().update(expiry);

return queryJob;
return completedQueryJob;
}

private List<String> generateQueries(FieldValueList timestampLimits) {
Expand Down Expand Up @@ -302,7 +311,7 @@ private FieldValueList getTimestampLimits(String entityTableName) {
.build();
try {
Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery));
TableResult getTimestampLimitsQueryResult = job.waitFor().getQueryResults();
TableResult getTimestampLimitsQueryResult = waitForJob(job).getQueryResults();
TableInfo expiry =
bigquery()
.getTable(getTimestampLimitsQuery.getDestinationTable())
Expand All @@ -325,4 +334,21 @@ private FieldValueList getTimestampLimits(String entityTableName) {
.asRuntimeException();
}
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
.asRuntimeException();
}
return completedJob;
}

}
7 changes: 6 additions & 1 deletion serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ feast:
redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16}

jobs:
# job-staging-location specifies the URI to store intermediate files for batch serving.
# staging-location specifies the URI to store intermediate files for batch serving.
# Feast Serving client is expected to have read access to this staging location
# to download the batch features.
#
# For example: gs://mybucket/myprefix
# Please omit the trailing slash in the URI.
staging-location: ${FEAST_JOB_STAGING_LOCATION:}
#
# Retry options for BigQuery jobs:
bigquery-initial-retry-delay-secs: 1
bigquery-total-timeout-secs: 21600
#
# Type of store to store job metadata. This only needs to be set if the
# serving store type is Bigquery.
store-type: ${FEAST_JOB_STORE_TYPE:}
Expand Down

0 comments on commit fe520a9

Please sign in to comment.