Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support readTime in Datastore query splitter. #763

Merged
merged 6 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datastore-v1-proto-client/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://mojo.codehaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<className>com/google/datastore/v1/client/QuerySplitter</className>
<method>java.util.List getSplits(com.google.datastore.v1.Query, com.google.datastore.v1.PartitionId, int, com.google.datastore.v1.client.Datastore, com.google.protobuf.Timestamp)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
5 changes: 5 additions & 0 deletions datastore-v1-proto-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>

<!-- Test dependencies. -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package com.google.datastore.v1.client;

import com.google.api.core.BetaApi;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.protobuf.Timestamp;
import java.util.List;

/** Provides the ability to split a query into multiple shards. */
Expand All @@ -39,4 +41,16 @@ public interface QuerySplitter {
*/
List<Query> getSplits(Query query, PartitionId partition, int numSplits, Datastore datastore)
throws DatastoreException;

/**
* Same as {@link #getSplits(Query, PartitionId, int, Datastore)} but the splits are based on
* {@code readTime}, and the returned sharded {@link Query}s should also be executed with {@code
* readTime}. Reading from a timestamp is currently a private preview feature in Datastore.
*/
@BetaApi
default List<Query> getSplits(
Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime)
throws DatastoreException {
throw new UnsupportedOperationException("Not implemented.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;

import com.google.api.core.BetaApi;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.Filter;
import com.google.datastore.v1.Key;
Expand All @@ -29,11 +30,14 @@
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.QueryResultBatch.MoreResultsType;
import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.RunQueryRequest;
import com.google.protobuf.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import javax.annotation.Nullable;

/**
* Provides the ability to split a query into multiple shards using Cloud Datastore.
Expand Down Expand Up @@ -63,7 +67,24 @@ private QuerySplitterImpl() {
public List<Query> getSplits(
Query query, PartitionId partition, int numSplits, Datastore datastore)
throws DatastoreException, IllegalArgumentException {
return getSplitsInternal(query, partition, numSplits, datastore, null);
}

@BetaApi
@Override
public List<Query> getSplits(
yixiaoshen marked this conversation as resolved.
Show resolved Hide resolved
Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime)
throws DatastoreException, IllegalArgumentException {
return getSplitsInternal(query, partition, numSplits, datastore, readTime);
}

private List<Query> getSplitsInternal(
Query query,
PartitionId partition,
int numSplits,
Datastore datastore,
@Nullable Timestamp readTime)
throws DatastoreException, IllegalArgumentException {
List<Query> splits = new ArrayList<Query>(numSplits);
if (numSplits == 1) {
splits.add(query);
Expand All @@ -72,7 +93,7 @@ public List<Query> getSplits(
validateQuery(query);
validateSplitSize(numSplits);

List<Key> scatterKeys = getScatterKeys(numSplits, query, partition, datastore);
List<Key> scatterKeys = getScatterKeys(numSplits, query, partition, datastore, readTime);
Key lastKey = null;
for (Key nextKey : getSplitKey(scatterKeys, numSplits)) {
splits.add(createSplit(lastKey, nextKey, query));
Expand Down Expand Up @@ -182,23 +203,28 @@ private Query createSplit(Key lastKey, Key nextKey, Query query) {
* @param query the user query.
* @param partition the partition to run the query in.
* @param datastore the datastore containing the data.
* @param readTime read time at which to get the split keys from the datastore.
* @throws DatastoreException if there was an error when executing the datastore query.
*/
private List<Key> getScatterKeys(
int numSplits, Query query, PartitionId partition, Datastore datastore)
int numSplits,
Query query,
PartitionId partition,
Datastore datastore,
@Nullable Timestamp readTime)
throws DatastoreException {
Query.Builder scatterPointQuery = createScatterQuery(query, numSplits);

List<Key> keySplits = new ArrayList<Key>();

QueryResultBatch batch;
do {
RunQueryRequest scatterRequest =
RunQueryRequest.newBuilder()
.setPartitionId(partition)
.setQuery(scatterPointQuery)
.build();
batch = datastore.runQuery(scatterRequest).getBatch();
RunQueryRequest.Builder scatterRequest =
RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery);
if (readTime != null) {
scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build());
}
batch = datastore.runQuery(scatterRequest.build()).getBatch();
for (EntityResult result : batch.getEntityResultsList()) {
keySplits.add(result.getEntity().getKey());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.datastore.v1.client.testing;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequest;
import java.io.IOException;

/** Fake credential used for testing purpose. */
public class MockCredential extends Credential {
public MockCredential() {
super(
new AccessMethod() {
@Override
public void intercept(HttpRequest request, String accessToken) throws IOException {}

@Override
public String getAccessTokenFromRequest(HttpRequest request) {
return "MockAccessToken";
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.datastore.v1.client.testing;

import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.LowLevelHttpRequest;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.client.testing.util.TestableByteArrayInputStream;
import com.google.common.collect.Iterables;
import com.google.datastore.v1.client.DatastoreFactory;
import com.google.datastore.v1.client.DatastoreOptions;
import com.google.protobuf.Message;
import com.google.rpc.Code;
import com.google.rpc.Status;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;

/** Fake Datastore factory used for testing purposes when a true Datastore service is not needed. */
public class MockDatastoreFactory extends DatastoreFactory {
private int nextStatus;
private Message nextResponse;
private Status nextError;
private IOException nextException;

private String lastPath;
private String lastMimeType;
private byte[] lastBody;
private List<String> lastCookies;
private String lastApiFormatHeaderValue;

public void setNextResponse(Message response) {
nextStatus = HttpStatusCodes.STATUS_CODE_OK;
nextResponse = response;
nextError = null;
nextException = null;
}

public void setNextError(int status, Code code, String message) {
nextStatus = status;
nextResponse = null;
nextError = makeErrorContent(message, code);
nextException = null;
}

public void setNextException(IOException exception) {
nextStatus = 0;
nextResponse = null;
nextError = null;
nextException = exception;
}

@Override
public HttpRequestFactory makeClient(DatastoreOptions options) {
HttpTransport transport =
new MockHttpTransport() {
@Override
public LowLevelHttpRequest buildRequest(String method, String url) {
return new MockLowLevelHttpRequest(url) {
@Override
public LowLevelHttpResponse execute() throws IOException {
lastPath = new GenericUrl(getUrl()).getRawPath();
lastMimeType = getContentType();
lastCookies = getHeaderValues("Cookie");
lastApiFormatHeaderValue =
Iterables.getOnlyElement(getHeaderValues("X-Goog-Api-Format-Version"));
ByteArrayOutputStream out = new ByteArrayOutputStream();
getStreamingContent().writeTo(out);
lastBody = out.toByteArray();
if (nextException != null) {
throw nextException;
}
MockLowLevelHttpResponse response =
new MockLowLevelHttpResponse()
.setStatusCode(nextStatus)
.setContentType("application/x-protobuf");
if (nextError != null) {
checkState(nextResponse == null);
response.setContent(new TestableByteArrayInputStream(nextError.toByteArray()));
} else {
response.setContent(new TestableByteArrayInputStream(nextResponse.toByteArray()));
}
return response;
}
};
}
};
Credential credential = options.getCredential();
return transport.createRequestFactory(credential);
}

public String getLastPath() {
return lastPath;
}

public String getLastMimeType() {
return lastMimeType;
}

public String getLastApiFormatHeaderValue() {
return lastApiFormatHeaderValue;
}

public byte[] getLastBody() {
return lastBody;
}

public List<String> getLastCookies() {
return lastCookies;
}

private static Status makeErrorContent(String message, Code code) {
return Status.newBuilder().setCode(code.getNumber()).setMessage(message).build();
}
}
Loading