Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry options to BigQuery #431

Merged
merged 2 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .prow/scripts/test-end-to-end-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -e
set -o pipefail

if ! cat /etc/*release | grep -q stretch; then
echo ${BASH_SOURCE} only supports Debian stretch.
echo ${BASH_SOURCE} only supports Debian stretch.
echo Please change your operating system to use this script.
exit 1
fi
Expand All @@ -16,7 +16,7 @@ This script will run end-to-end tests for Feast Core and Batch Serving.
2. Install Redis as the job store for Feast Batch Serving.
4. Install Postgres for persisting Feast metadata.
5. Install Kafka and Zookeeper as the Source in Feast.
6. Install Python 3.7.4, Feast Python SDK and run end-to-end tests from
6. Install Python 3.7.4, Feast Python SDK and run end-to-end tests from
tests/e2e via pytest.
"

Expand Down Expand Up @@ -185,6 +185,8 @@ feast:
jobs:
staging-location: gs://feast-templocation-kf-feast/staging-location
store-type: REDIS
bigquery-initial-retry-delay-secs: 1
bigquery-total-timeout-secs: 900
store-options:
host: $REMOTE_HOST
port: 6379
Expand Down
18 changes: 18 additions & 0 deletions serving/src/main/java/feast/serving/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,23 @@ public void setRedisPoolMaxIdle(int redisPoolMaxIdle) {

public static class JobProperties {
private String stagingLocation;
private int bigqueryInitialRetryDelaySecs;
private int bigqueryTotalTimeoutSecs;
private String storeType;
private Map<String, String> storeOptions;

public String getStagingLocation() {
return this.stagingLocation;
}

public int getBigqueryInitialRetryDelaySecs() {
return bigqueryInitialRetryDelaySecs;
}

public int getBigqueryTotalTimeoutSecs() {
return bigqueryTotalTimeoutSecs;
}

public String getStoreType() {
return this.storeType;
}
Expand All @@ -132,6 +142,14 @@ public void setStagingLocation(String stagingLocation) {
this.stagingLocation = stagingLocation;
}

public void setBigqueryInitialRetryDelaySecs(int bigqueryInitialRetryDelaySecs) {
this.bigqueryInitialRetryDelaySecs = bigqueryInitialRetryDelaySecs;
}

public void setBigqueryTotalTimeoutSecs(int bigqueryTotalTimeoutSecs) {
this.bigqueryTotalTimeoutSecs = bigqueryTotalTimeoutSecs;
}

public void setStoreType(String storeType) {
this.storeType = storeType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public ServingService servingService(
specService,
jobService,
jobStagingLocation,
feastProperties.getJobs().getBigqueryInitialRetryDelaySecs(),
feastProperties.getJobs().getBigqueryTotalTimeoutSecs(),
storage);
break;
case CASSANDRA:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static feast.serving.store.bigquery.QueryTemplater.generateFullTableName;
import static feast.serving.util.Metrics.requestLatency;

import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
Expand Down Expand Up @@ -57,12 +58,12 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.threeten.bp.Duration;

public class BigQueryServingService implements ServingService {

public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.standardDays(1).getMillis();
public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.ofDays(1).toMillis();
private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryServingService.class);

private final BigQuery bigquery;
Expand All @@ -71,6 +72,8 @@ public class BigQueryServingService implements ServingService {
private final CachedSpecService specService;
private final JobService jobService;
private final String jobStagingLocation;
private final int initialRetryDelaySecs;
private final int totalTimeoutSecs;
private final Storage storage;

public BigQueryServingService(
Expand All @@ -80,13 +83,17 @@ public BigQueryServingService(
CachedSpecService specService,
JobService jobService,
String jobStagingLocation,
int initialRetryDelaySecs,
int totalTimeoutSecs,
Storage storage) {
this.bigquery = bigquery;
this.projectId = projectId;
this.datasetId = datasetId;
this.specService = specService;
this.jobService = jobService;
this.jobStagingLocation = jobStagingLocation;
this.initialRetryDelaySecs = initialRetryDelaySecs;
this.totalTimeoutSecs = totalTimeoutSecs;
this.storage = storage;
}

Expand Down Expand Up @@ -156,6 +163,8 @@ public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeat
.setEntityTableColumnNames(entityNames)
.setFeatureSetInfos(featureSetInfos)
.setJobStagingLocation(jobStagingLocation)
.setInitialRetryDelaySecs(initialRetryDelaySecs)
.setTotalTimeoutSecs(totalTimeoutSecs)
.build())
.start();

Expand Down Expand Up @@ -199,7 +208,7 @@ private Table loadEntities(DatasetSource datasetSource) {
loadJobConfiguration =
loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build();
Job job = bigquery.create(JobInfo.of(loadJobConfiguration));
job.waitFor();
waitForJob(job);

TableInfo expiry =
bigquery
Expand Down Expand Up @@ -239,15 +248,15 @@ private TableId generateUUIDs(Table loadedEntityTable) {
.setDestinationTable(TableId.of(projectId, datasetId, createTempTableName()))
.build();
Job queryJob = bigquery.create(JobInfo.of(queryJobConfig));
queryJob.waitFor();
Job completedJob = waitForJob(queryJob);
TableInfo expiry =
bigquery
.getTable(queryJobConfig.getDestinationTable())
.toBuilder()
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery.update(expiry);
queryJobConfig = queryJob.getConfiguration();
queryJobConfig = completedJob.getConfiguration();
return queryJobConfig.getDestinationTable();
} catch (InterruptedException | BigQueryException e) {
throw Status.INTERNAL
Expand All @@ -257,6 +266,22 @@ private TableId generateUUIDs(Table loadedEntityTable) {
}
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
.asRuntimeException();
}
return completedJob;
}

public static String createTempTableName() {
return "_" + UUID.randomUUID().toString().replace("-", "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static feast.serving.store.bigquery.QueryTemplater.createTimestampLimitQuery;

import com.google.auto.value.AutoValue;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.threeten.bp.Duration;

@AutoValue
public abstract class BatchRetrievalQueryRunnable implements Runnable {
Expand All @@ -75,6 +77,10 @@ public abstract class BatchRetrievalQueryRunnable implements Runnable {

public abstract String jobStagingLocation();

public abstract int initialRetryDelaySecs();

public abstract int totalTimeoutSecs();

public abstract Storage storage();

public static Builder builder() {
Expand All @@ -101,6 +107,10 @@ public abstract static class Builder {

public abstract Builder setJobStagingLocation(String jobStagingLocation);

public abstract Builder setInitialRetryDelaySecs(int initialRetryDelaySecs);

public abstract Builder setTotalTimeoutSecs(int totalTimeoutSecs);

public abstract Builder setStorage(Storage storage);

public abstract BatchRetrievalQueryRunnable build();
Expand All @@ -126,7 +136,7 @@ public void run() {
ExtractJobConfiguration.of(
queryConfig.getDestinationTable(), exportTableDestinationUri, "Avro");
Job extractJob = bigquery().create(JobInfo.of(extractConfig));
extractJob.waitFor();
waitForJob(extractJob);
} catch (BigQueryException | InterruptedException | IOException e) {
jobService()
.upsert(
Expand Down Expand Up @@ -174,7 +184,6 @@ private List<String> parseOutputFileURIs() {

Job runBatchQuery(List<String> featureSetQueries)
throws BigQueryException, InterruptedException, IOException {
Job queryJob;
ExecutorService executorService = Executors.newFixedThreadPool(featureSetQueries.size());
ExecutorCompletionService<FeatureSetInfo> executorCompletionService =
new ExecutorCompletionService<>(executorService);
Expand Down Expand Up @@ -225,8 +234,8 @@ Job runBatchQuery(List<String> featureSetQueries)
QueryJobConfiguration.newBuilder(joinQuery)
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
.build();
queryJob = bigquery().create(JobInfo.of(queryJobConfig));
queryJob.waitFor();
Job queryJob = bigquery().create(JobInfo.of(queryJobConfig));
Job completedQueryJob = waitForJob(queryJob);

TableInfo expiry =
bigquery()
Expand All @@ -236,7 +245,7 @@ Job runBatchQuery(List<String> featureSetQueries)
.build();
bigquery().update(expiry);

return queryJob;
return completedQueryJob;
}

private List<String> generateQueries(FieldValueList timestampLimits) {
Expand Down Expand Up @@ -270,7 +279,7 @@ private FieldValueList getTimestampLimits(String entityTableName) {
.build();
try {
Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery));
TableResult getTimestampLimitsQueryResult = job.waitFor().getQueryResults();
TableResult getTimestampLimitsQueryResult = waitForJob(job).getQueryResults();
TableInfo expiry =
bigquery()
.getTable(getTimestampLimitsQuery.getDestinationTable())
Expand All @@ -293,4 +302,21 @@ private FieldValueList getTimestampLimits(String entityTableName) {
.asRuntimeException();
}
}

private Job waitForJob(Job queryJob) throws InterruptedException {
Job completedJob = queryJob.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
if (completedJob == null) {
throw Status.INTERNAL
.withDescription("Job no longer exists")
.asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
.asRuntimeException();
}
return completedJob;
}

}
7 changes: 6 additions & 1 deletion serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ feast:
redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16}

jobs:
# job-staging-location specifies the URI to store intermediate files for batch serving.
# staging-location specifies the URI to store intermediate files for batch serving.
# Feast Serving client is expected to have read access to this staging location
# to download the batch features.
#
# For example: gs://mybucket/myprefix
# Please omit the trailing slash in the URI.
staging-location: ${FEAST_JOB_STAGING_LOCATION:}
#
# Retry options for BigQuery jobs:
bigquery-initial-retry-delay-secs: 1
bigquery-total-timeout-secs: 21600
#
# Type of store to store job metadata. This only needs to be set if the
# serving store type is Bigquery.
store-type: ${FEAST_JOB_STORE_TYPE:}
Expand Down