Skip to content

Commit

Permalink
feat: support 'set local' for retry_aborts_internally (googleapis#3532)
Browse files Browse the repository at this point in the history
Adds support for `set local retry_aborts_internally=true|false` in the
Connection API. This change also adds the parsing infrastructure that is
needed to support `set local` for all connection variables. Support for
this will be added to other connection variables in follow-up pull requests.
  • Loading branch information
olavloite authored Dec 10, 2024
1 parent 7d2534d commit 725efa3
Show file tree
Hide file tree
Showing 11 changed files with 3,740 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@

package com.google.cloud.spanner.connection;

import com.google.cloud.Tuple;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -31,8 +36,10 @@
* AUTOCOMMIT=TRUE.
*/
class ClientSideStatementSetExecutor<T> implements ClientSideStatementExecutor {
private final Cache<String, Tuple<T, Boolean>> cache;
private final ClientSideStatementImpl statement;
private final Method method;
private final boolean supportsLocal;
private final ClientSideStatementValueConverter<T> converter;
private final Pattern allowedValuesPattern;

Expand All @@ -46,12 +53,18 @@ class ClientSideStatementSetExecutor<T> implements ClientSideStatementExecutor {
@SuppressWarnings("unchecked")
ClientSideStatementSetExecutor(ClientSideStatementImpl statement) throws CompileException {
Preconditions.checkNotNull(statement.getSetStatement());
this.cache =
CacheBuilder.newBuilder()
.maximumSize(25)
// Set the concurrency level to 1, as we don't expect many concurrent updates.
.concurrencyLevel(1)
.build();
try {
this.statement = statement;
this.allowedValuesPattern =
Pattern.compile(
String.format(
"(?is)\\A\\s*set\\s+%s\\s*%s\\s*%s\\s*\\z",
"(?is)\\A\\s*set\\s+((?:local|session)\\s+)?%s\\s*%s\\s*%s\\s*\\z",
statement.getSetStatement().getPropertyName(),
statement.getSetStatement().getSeparator(),
statement.getSetStatement().getAllowedValues()));
Expand All @@ -64,9 +77,21 @@ class ClientSideStatementSetExecutor<T> implements ClientSideStatementExecutor {
Constructor<ClientSideStatementValueConverter<T>> constructor =
converterClass.getConstructor(String.class);
this.converter = constructor.newInstance(statement.getSetStatement().getAllowedValues());
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), converter.getParameterClass());
Method method;
boolean supportsLocal;
try {
method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), converter.getParameterClass());
supportsLocal = false;
} catch (NoSuchMethodException ignore) {
method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), converter.getParameterClass(), Boolean.class);
supportsLocal = true;
}
this.method = method;
this.supportsLocal = supportsLocal;
} catch (Exception e) {
throw new CompileException(e, statement);
}
Expand All @@ -75,17 +100,29 @@ class ClientSideStatementSetExecutor<T> implements ClientSideStatementExecutor {
@Override
public StatementResult execute(ConnectionStatementExecutor connection, ParsedStatement statement)
throws Exception {
return (StatementResult)
method.invoke(connection, getParameterValue(statement.getSqlWithoutComments()));
Tuple<T, Boolean> value;
try {
value =
this.cache.get(
statement.getSqlWithoutComments(),
() -> getParameterValue(statement.getSqlWithoutComments()));
} catch (ExecutionException | UncheckedExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
}
if (this.supportsLocal) {
return (StatementResult) method.invoke(connection, value.x(), value.y());
}
return (StatementResult) method.invoke(connection, value.x());
}

T getParameterValue(String sql) {
Tuple<T, Boolean> getParameterValue(String sql) {
Matcher matcher = allowedValuesPattern.matcher(sql);
if (matcher.find() && matcher.groupCount() >= 1) {
String value = matcher.group(1);
if (matcher.find() && matcher.groupCount() >= 2) {
boolean local = matcher.group(1) != null && "local".equalsIgnoreCase(matcher.group(1).trim());
String value = matcher.group(2);
T res = converter.convert(value);
if (res != null) {
return res;
return Tuple.of(res, local);
}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
Expand All @@ -94,8 +131,9 @@ T getParameterValue(String sql) {
this.statement.getSetStatement().getPropertyName(), value));
} else {
Matcher invalidMatcher = this.statement.getPattern().matcher(sql);
if (invalidMatcher.find() && invalidMatcher.groupCount() == 1) {
String invalidValue = invalidMatcher.group(1);
int valueGroup = this.supportsLocal ? 2 : 1;
if (invalidMatcher.find() && invalidMatcher.groupCount() == valueGroup) {
String invalidValue = invalidMatcher.group(valueGroup);
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,12 @@ public boolean isRetryAbortsInternally() {

@Override
public void setRetryAbortsInternally(boolean retryAbortsInternally) {
setRetryAbortsInternally(retryAbortsInternally, /* local = */ false);
}

void setRetryAbortsInternally(boolean retryAbortsInternally, boolean local) {
checkSetRetryAbortsInternallyAvailable();
setConnectionPropertyValue(RETRY_ABORTS_INTERNALLY, retryAbortsInternally);
setConnectionPropertyValue(RETRY_ABORTS_INTERNALLY, retryAbortsInternally, local);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ interface ConnectionStatementExecutor {

StatementResult statementShowReadOnly();

StatementResult statementSetRetryAbortsInternally(Boolean retryAbortsInternally);
StatementResult statementSetRetryAbortsInternally(Boolean retryAbortsInternally, Boolean local);

StatementResult statementShowRetryAbortsInternally();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ public StatementResult statementShowReadOnly() {
}

@Override
public StatementResult statementSetRetryAbortsInternally(Boolean retryAbortsInternally) {
public StatementResult statementSetRetryAbortsInternally(
Boolean retryAbortsInternally, Boolean local) {
Preconditions.checkNotNull(retryAbortsInternally);
getConnection().setRetryAbortsInternally(retryAbortsInternally);
getConnection().setRetryAbortsInternally(retryAbortsInternally, local);
return noResult(SET_RETRY_ABORTS_INTERNALLY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,18 @@
}
},
{
"name": "SET RETRY_ABORTS_INTERNALLY = TRUE|FALSE",
"name": "SET [LOCAL] RETRY_ABORTS_INTERNALLY = TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_RETRY_ABORTS_INTERNALLY",
"regex": "(?is)\\A\\s*set\\s+retry_aborts_internally\\s*(?:=)\\s*(.*)\\z",
"regex": "(?is)\\A\\s*set\\s+(local\\s+)?retry_aborts_internally\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetRetryAbortsInternally",
"exampleStatements": ["set retry_aborts_internally = true", "set retry_aborts_internally = false"],
"exampleStatements": [
"set retry_aborts_internally = true",
"set retry_aborts_internally = false",
"set local retry_aborts_internally = true",
"set local retry_aborts_internally = false"
],
"examplePrerequisiteStatements": ["set readonly = false", "set autocommit = false"],
"setStatement": {
"propertyName": "RETRY_ABORTS_INTERNALLY",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,26 @@
}
},
{
"name": "SET SPANNER.RETRY_ABORTS_INTERNALLY =|TO TRUE|FALSE",
"name": "SET [SESSION|LOCAL] SPANNER.RETRY_ABORTS_INTERNALLY =|TO TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_RETRY_ABORTS_INTERNALLY",
"regex": "(?is)\\A\\s*set\\s+spanner\\.retry_aborts_internally(?:\\s*=\\s*|\\s+to\\s+)(.*)\\z",
"regex": "(?is)\\A\\s*set\\s+((?:session|local)\\s+)?spanner\\.retry_aborts_internally(?:\\s*=\\s*|\\s+to\\s+)(.*)\\z",
"method": "statementSetRetryAbortsInternally",
"exampleStatements": ["set spanner.retry_aborts_internally = true", "set spanner.retry_aborts_internally = false", "set spanner.retry_aborts_internally to true", "set spanner.retry_aborts_internally to false"],
"exampleStatements": [
"set spanner.retry_aborts_internally = true",
"set spanner.retry_aborts_internally = false",
"set spanner.retry_aborts_internally to true",
"set spanner.retry_aborts_internally to false",
"set local spanner.retry_aborts_internally = true",
"set local spanner.retry_aborts_internally = false",
"set local spanner.retry_aborts_internally to true",
"set local spanner.retry_aborts_internally to false",
"set session spanner.retry_aborts_internally = true",
"set session spanner.retry_aborts_internally = false",
"set session spanner.retry_aborts_internally to true",
"set session spanner.retry_aborts_internally to false"
],
"examplePrerequisiteStatements": ["set spanner.readonly = false", "set autocommit = false"],
"setStatement": {
"propertyName": "SPANNER.RETRY_ABORTS_INTERNALLY",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import static com.google.cloud.spanner.connection.ConnectionProperties.CONNECTION_STATE_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ConnectionState.Type;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import org.junit.After;
Expand Down Expand Up @@ -84,6 +89,10 @@ ITConnection createConnection(ConnectionState.Type type) {
return createConnection(";" + CONNECTION_STATE_TYPE.getKey() + "=" + type.name());
}

String getPrefix() {
return dialect == Dialect.POSTGRESQL ? "SPANNER." : "";
}

@Test
public void testConnectionStateType() {
try (Connection connection = createConnection()) {
Expand Down Expand Up @@ -228,4 +237,58 @@ public void testLocalChangeIsLostAfterTransaction() {
}
}
}

@Test
public void testSetLocalWithSqlStatement() {
try (Connection connection = createConnection()) {
connection.setAutocommit(false);

assertTrue(connection.isRetryAbortsInternally());
connection.execute(
Statement.of(String.format("set local %sretry_aborts_internally=false", getPrefix())));
assertFalse(connection.isRetryAbortsInternally());
connection.commit();
assertTrue(connection.isRetryAbortsInternally());
}
}

@Test
public void testSetSessionWithSqlStatement() {
assumeTrue("Only PostgreSQL supports the 'session' keyword", dialect == Dialect.POSTGRESQL);

try (Connection connection = createConnection()) {
connection.setAutocommit(false);

assertTrue(connection.isRetryAbortsInternally());
connection.execute(
Statement.of(String.format("set session %sretry_aborts_internally=false", getPrefix())));
assertFalse(connection.isRetryAbortsInternally());
connection.commit();
assertFalse(connection.isRetryAbortsInternally());
}
}

@Test
public void testSetLocalInvalidValue() {
try (Connection connection = createConnection()) {
connection.setAutocommit(false);

assertTrue(connection.isRetryAbortsInternally());
SpannerException exception =
assertThrows(
SpannerException.class,
() ->
connection.execute(
Statement.of(
String.format("set local %sretry_aborts_internally=foo", getPrefix()))));
assertEquals(ErrorCode.INVALID_ARGUMENT, exception.getErrorCode());
assertTrue(
exception.getMessage(),
exception
.getMessage()
.endsWith(
String.format("Unknown value for %sRETRY_ABORTS_INTERNALLY: foo", getPrefix())));
assertTrue(connection.isRetryAbortsInternally());
}
}
}
Loading

0 comments on commit 725efa3

Please sign in to comment.