Skip to content

Commit

Permalink
feat: support readTime in Datastore query splitter. (#763)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](/~https://github.com/googleapis/java-datastore/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
/~https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
yixiaoshen authored Jun 21, 2022
1 parent 7358aa3 commit 61758e0
Show file tree
Hide file tree
Showing 8 changed files with 568 additions and 122 deletions.
9 changes: 9 additions & 0 deletions datastore-v1-proto-client/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://mojo.codehaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<className>com/google/datastore/v1/client/QuerySplitter</className>
<method>java.util.List getSplits(com.google.datastore.v1.Query, com.google.datastore.v1.PartitionId, int, com.google.datastore.v1.client.Datastore, com.google.protobuf.Timestamp)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
5 changes: 5 additions & 0 deletions datastore-v1-proto-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>

<!-- Test dependencies. -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package com.google.datastore.v1.client;

import com.google.api.core.BetaApi;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.protobuf.Timestamp;
import java.util.List;

/** Provides the ability to split a query into multiple shards. */
Expand All @@ -39,4 +41,16 @@ public interface QuerySplitter {
*/
List<Query> getSplits(Query query, PartitionId partition, int numSplits, Datastore datastore)
throws DatastoreException;

/**
* Same as {@link #getSplits(Query, PartitionId, int, Datastore)} but the splits are based on
* {@code readTime}, and the returned sharded {@link Query}s should also be executed with {@code
* readTime}. Reading from a timestamp is currently a private preview feature in Datastore.
*/
@BetaApi
default List<Query> getSplits(
Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime)
throws DatastoreException {
throw new UnsupportedOperationException("Not implemented.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;

import com.google.api.core.BetaApi;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.Filter;
import com.google.datastore.v1.Key;
Expand All @@ -29,11 +30,14 @@
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.QueryResultBatch.MoreResultsType;
import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.RunQueryRequest;
import com.google.protobuf.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import javax.annotation.Nullable;

/**
* Provides the ability to split a query into multiple shards using Cloud Datastore.
Expand Down Expand Up @@ -63,7 +67,24 @@ private QuerySplitterImpl() {
public List<Query> getSplits(
Query query, PartitionId partition, int numSplits, Datastore datastore)
throws DatastoreException, IllegalArgumentException {
return getSplitsInternal(query, partition, numSplits, datastore, null);
}

@BetaApi
@Override
public List<Query> getSplits(
Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime)
throws DatastoreException, IllegalArgumentException {
return getSplitsInternal(query, partition, numSplits, datastore, readTime);
}

private List<Query> getSplitsInternal(
Query query,
PartitionId partition,
int numSplits,
Datastore datastore,
@Nullable Timestamp readTime)
throws DatastoreException, IllegalArgumentException {
List<Query> splits = new ArrayList<Query>(numSplits);
if (numSplits == 1) {
splits.add(query);
Expand All @@ -72,7 +93,7 @@ public List<Query> getSplits(
validateQuery(query);
validateSplitSize(numSplits);

List<Key> scatterKeys = getScatterKeys(numSplits, query, partition, datastore);
List<Key> scatterKeys = getScatterKeys(numSplits, query, partition, datastore, readTime);
Key lastKey = null;
for (Key nextKey : getSplitKey(scatterKeys, numSplits)) {
splits.add(createSplit(lastKey, nextKey, query));
Expand Down Expand Up @@ -182,23 +203,28 @@ private Query createSplit(Key lastKey, Key nextKey, Query query) {
* @param query the user query.
* @param partition the partition to run the query in.
* @param datastore the datastore containing the data.
* @param readTime read time at which to get the split keys from the datastore.
* @throws DatastoreException if there was an error when executing the datastore query.
*/
private List<Key> getScatterKeys(
int numSplits, Query query, PartitionId partition, Datastore datastore)
int numSplits,
Query query,
PartitionId partition,
Datastore datastore,
@Nullable Timestamp readTime)
throws DatastoreException {
Query.Builder scatterPointQuery = createScatterQuery(query, numSplits);

List<Key> keySplits = new ArrayList<Key>();

QueryResultBatch batch;
do {
RunQueryRequest scatterRequest =
RunQueryRequest.newBuilder()
.setPartitionId(partition)
.setQuery(scatterPointQuery)
.build();
batch = datastore.runQuery(scatterRequest).getBatch();
RunQueryRequest.Builder scatterRequest =
RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery);
if (readTime != null) {
scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build());
}
batch = datastore.runQuery(scatterRequest.build()).getBatch();
for (EntityResult result : batch.getEntityResultsList()) {
keySplits.add(result.getEntity().getKey());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.datastore.v1.client.testing;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequest;
import java.io.IOException;

/** Fake credential used for testing purpose. */
public class MockCredential extends Credential {
public MockCredential() {
super(
new AccessMethod() {
@Override
public void intercept(HttpRequest request, String accessToken) throws IOException {}

@Override
public String getAccessTokenFromRequest(HttpRequest request) {
return "MockAccessToken";
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.datastore.v1.client.testing;

import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.LowLevelHttpRequest;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.client.testing.util.TestableByteArrayInputStream;
import com.google.common.collect.Iterables;
import com.google.datastore.v1.client.DatastoreFactory;
import com.google.datastore.v1.client.DatastoreOptions;
import com.google.protobuf.Message;
import com.google.rpc.Code;
import com.google.rpc.Status;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;

/** Fake Datastore factory used for testing purposes when a true Datastore service is not needed. */
public class MockDatastoreFactory extends DatastoreFactory {
private int nextStatus;
private Message nextResponse;
private Status nextError;
private IOException nextException;

private String lastPath;
private String lastMimeType;
private byte[] lastBody;
private List<String> lastCookies;
private String lastApiFormatHeaderValue;

public void setNextResponse(Message response) {
nextStatus = HttpStatusCodes.STATUS_CODE_OK;
nextResponse = response;
nextError = null;
nextException = null;
}

public void setNextError(int status, Code code, String message) {
nextStatus = status;
nextResponse = null;
nextError = makeErrorContent(message, code);
nextException = null;
}

public void setNextException(IOException exception) {
nextStatus = 0;
nextResponse = null;
nextError = null;
nextException = exception;
}

@Override
public HttpRequestFactory makeClient(DatastoreOptions options) {
HttpTransport transport =
new MockHttpTransport() {
@Override
public LowLevelHttpRequest buildRequest(String method, String url) {
return new MockLowLevelHttpRequest(url) {
@Override
public LowLevelHttpResponse execute() throws IOException {
lastPath = new GenericUrl(getUrl()).getRawPath();
lastMimeType = getContentType();
lastCookies = getHeaderValues("Cookie");
lastApiFormatHeaderValue =
Iterables.getOnlyElement(getHeaderValues("X-Goog-Api-Format-Version"));
ByteArrayOutputStream out = new ByteArrayOutputStream();
getStreamingContent().writeTo(out);
lastBody = out.toByteArray();
if (nextException != null) {
throw nextException;
}
MockLowLevelHttpResponse response =
new MockLowLevelHttpResponse()
.setStatusCode(nextStatus)
.setContentType("application/x-protobuf");
if (nextError != null) {
checkState(nextResponse == null);
response.setContent(new TestableByteArrayInputStream(nextError.toByteArray()));
} else {
response.setContent(new TestableByteArrayInputStream(nextResponse.toByteArray()));
}
return response;
}
};
}
};
Credential credential = options.getCredential();
return transport.createRequestFactory(credential);
}

public String getLastPath() {
return lastPath;
}

public String getLastMimeType() {
return lastMimeType;
}

public String getLastApiFormatHeaderValue() {
return lastApiFormatHeaderValue;
}

public byte[] getLastBody() {
return lastBody;
}

public List<String> getLastCookies() {
return lastCookies;
}

private static Status makeErrorContent(String message, Code code) {
return Status.newBuilder().setCode(code.getNumber()).setMessage(message).build();
}
}
Loading

0 comments on commit 61758e0

Please sign in to comment.