Skip to content

Commit

Permalink
chore: add internal option for statement executor type (googleapis#3534)
Browse files Browse the repository at this point in the history
The Connection API by default uses either a platform thread or a virtual thread
for each connection to execute and control the statements of that connection. This
is used to enable asynchronous execution of statements and allows a statement to
be cancelled by just interrupting this thread. Both these use cases are however
not (or only very rarely) used by the most common users of the Connection API;
the JDBC driver and PGAdapter. PGAdapter uses the PostgreSQL wire-protocol, which
by design is synchronous, and JDBC is also a synchronous API. The latter has a
cancel() method that currently requires this threading model, but this can be
modified in the JDBC driver.

Using a direct executor instead of a single-threaded executor per connection can
save one thread per connection.

The option is intentionally made package-private, so the above-mentioned
frameworks can set it by default without it becoming part of the public API.
  • Loading branch information
olavloite authored and sagnghos committed Dec 11, 2024
1 parent 1b18f06 commit 47e8407
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.connection.ConnectionProperty.Context;
import com.google.cloud.spanner.connection.ConnectionState.Type;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.cloud.spanner.connection.UnitOfWork.CallType;
Expand Down Expand Up @@ -284,9 +285,17 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
Preconditions.checkNotNull(options);
this.leakedException =
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
StatementExecutorType statementExecutorType;
if (options.getStatementExecutorType() != null) {
statementExecutorType = options.getStatementExecutorType();
} else {
statementExecutorType =
options.isUseVirtualThreads()
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.DIRECT_EXECUTOR;
}
this.statementExecutor =
new StatementExecutor(
options.isUseVirtualThreads(), options.getStatementExecutionInterceptors());
new StatementExecutor(statementExecutorType, options.getStatementExecutionInterceptors());
this.spannerPool = SpannerPool.INSTANCE;
this.options = options;
this.spanner = spannerPool.getSpanner(options, this);
Expand Down Expand Up @@ -330,7 +339,11 @@ && getDialect() == Dialect.POSTGRESQL
this.leakedException =
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
this.statementExecutor =
new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList());
new StatementExecutor(
options.isUseVirtualThreads()
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.DIRECT_EXECUTOR,
Collections.emptyList());
this.spannerPool = Preconditions.checkNotNull(spannerPool);
this.options = Preconditions.checkNotNull(options);
this.spanner = spannerPool.getSpanner(options, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -614,6 +615,7 @@ public static class Builder {
new HashMap<>();
private String uri;
private Credentials credentials;
private StatementExecutorType statementExecutorType;
private SessionPoolOptions sessionPoolOptions;
private List<StatementExecutionInterceptor> statementExecutionInterceptors =
Collections.emptyList();
Expand Down Expand Up @@ -787,6 +789,11 @@ Builder setCredentials(Credentials credentials) {
return this;
}

Builder setStatementExecutorType(StatementExecutorType statementExecutorType) {
this.statementExecutorType = statementExecutorType;
return this;
}

public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
return this;
Expand Down Expand Up @@ -824,6 +831,7 @@ public static Builder newBuilder() {
private final String instanceId;
private final String databaseName;
private final Credentials credentials;
private final StatementExecutorType statementExecutorType;
private final SessionPoolOptions sessionPoolOptions;

private final OpenTelemetry openTelemetry;
Expand Down Expand Up @@ -851,6 +859,7 @@ private ConnectionOptions(Builder builder) {
ConnectionPropertyValue<Boolean> value = cast(connectionPropertyValues.get(LENIENT.getKey()));
this.warnings = checkValidProperties(value != null && value.getValue(), uri);
this.fixedCredentials = builder.credentials;
this.statementExecutorType = builder.statementExecutorType;

this.openTelemetry = builder.openTelemetry;
this.statementExecutionInterceptors =
Expand Down Expand Up @@ -1128,6 +1137,10 @@ CredentialsProvider getCredentialsProvider() {
return getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER);
}

StatementExecutorType getStatementExecutorType() {
return this.statementExecutorType;
}

/** The {@link SessionPoolOptions} of this {@link ConnectionOptions}. */
public SessionPoolOptions getSessionPoolOptions() {
return sessionPoolOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ java.time.Duration asDuration() {
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("connection-executor", false);

/** Creates an {@link ExecutorService} for a {@link StatementExecutor}. */
private static ListeningExecutorService createExecutorService(boolean useVirtualThreads) {
private static ListeningExecutorService createExecutorService(StatementExecutorType type) {
if (type == StatementExecutorType.DIRECT_EXECUTOR) {
return MoreExecutors.newDirectExecutorService();
}
return MoreExecutors.listeningDecorator(
Context.taskWrapping(
new ThreadPoolExecutor(
Expand All @@ -155,7 +158,7 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
useVirtualThreads
type == StatementExecutorType.VIRTUAL_THREAD
? DEFAULT_VIRTUAL_THREAD_FACTORY
: DEFAULT_DAEMON_THREAD_FACTORY)));
}
Expand All @@ -168,13 +171,23 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual
*/
private final List<StatementExecutionInterceptor> interceptors;

enum StatementExecutorType {
PLATFORM_THREAD,
VIRTUAL_THREAD,
DIRECT_EXECUTOR,
}

@VisibleForTesting
StatementExecutor() {
this(DEFAULT_USE_VIRTUAL_THREADS, Collections.emptyList());
this(
DEFAULT_USE_VIRTUAL_THREADS
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.PLATFORM_THREAD,
Collections.emptyList());
}

StatementExecutor(boolean useVirtualThreads, List<StatementExecutionInterceptor> interceptors) {
this.executor = createExecutorService(useVirtualThreads);
StatementExecutor(StatementExecutorType type, List<StatementExecutionInterceptor> interceptors) {
this.executor = createExecutorService(type);
this.interceptors = Collections.unmodifiableList(interceptors);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ ITConnection createConnection(
ConnectionOptions.newBuilder()
.setUri(getBaseUrl() + additionalUrlOptions)
.setStatementExecutionInterceptors(interceptors);
configureConnectionOptions(builder);
ConnectionOptions options = builder.build();
ITConnection connection = createITConnection(options);
for (TransactionRetryListener listener : transactionRetryListeners) {
Expand All @@ -291,6 +292,11 @@ ITConnection createConnection(
return connection;
}

protected ConnectionOptions.Builder configureConnectionOptions(
ConnectionOptions.Builder builder) {
return builder;
}

protected String getBaseUrl() {
return String.format(
"cloudspanner://localhost:%d/projects/proj/instances/inst/databases/db?usePlainText=true;autocommit=false;retryAbortsInternally=true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -127,6 +129,11 @@ ITConnection createConnection(TransactionRetryListener listener) {
return connection;
}

@Override
protected Builder configureConnectionOptions(Builder builder) {
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
}

@Test
public void testSingleQueryAborted() {
RetryCounter counter = new RetryCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.spanner.SpannerApiFutures;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
Expand Down Expand Up @@ -86,6 +88,11 @@ public void setup() {
}
}

@Override
protected Builder configureConnectionOptions(Builder builder) {
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
}

@After
public void reset() {
mockSpanner.removeAllExecutionTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.collect.ImmutableList;
import com.google.spanner.v1.BatchCreateSessionsRequest;
import com.google.spanner.v1.CommitRequest;
Expand Down Expand Up @@ -417,6 +419,11 @@ protected String getBaseUrl() {
return super.getBaseUrl() + ";maxSessions=1";
}

@Override
protected Builder configureConnectionOptions(Builder builder) {
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
}

@Test
public void testMaxSessions()
throws InterruptedException, TimeoutException, ExecutionException {
Expand Down
Loading

0 comments on commit 47e8407

Please sign in to comment.