Merged
Show file tree
Hide file tree
Changes from 1 commit
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Next Next commit
fix: retry cancelled error on first statement in transaction
If the first statement of a read/write transaction fails with a CANCELLED error
and the error message is `Read/query was cancelled due to the enclosing transaction
being invalidated by a later transaction in the same session.`, then the transaction
should be retried, as the error could be caused by a previous statement that was
abandoned by the client but still executed by the backend. This could be the case
if the statement timed out (on the client) or was cancelled.

Fixes #938
  • Loading branch information
@olavloite
olavloite committedMar 19, 2021
commit a245d14aeff68035c989b4d21617c51d49e66583
Original file line numberDiff line numberDiff line change
Expand Up@@ -701,7 +701,9 @@ public void close() {
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -81,8 +81,8 @@ interface Listener {
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e, boolean withBeginTransaction);
/** Called when the read finishes with an error. Returns the error that should be thrown. */
SpannerException onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);
Expand DownExpand Up@@ -159,9 +159,9 @@ public Type getType() {
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
listener.onError(e, beginTransaction);
SpannerException toThrow = listener.onError(e, beginTransaction);
close();
throw e;
throw toThrow;
}
}
/**
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -372,8 +372,7 @@ public void run() {
}
span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e));
TraceUtil.endSpanWithFailure(opSpan, e);
onError((SpannerException) e, false);
res.setException(e);
res.setException(onError((SpannerException) e, false));
}
}
}),
Expand DownExpand Up@@ -519,7 +518,7 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
// If the statement that caused an error was the statement that included a BeginTransaction
// option, we simulate an aborted transaction to force a retry of the entire transaction. This
// will cause the retry to execute an explicit BeginTransaction RPC and then the actual
Expand All@@ -536,21 +535,41 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
}
SpannerException exceptionToThrow;
if (withBeginTransaction
&& e.getErrorCode() == ErrorCode.CANCELLED
&& e.getMessage().contains("invalidated by a later transaction")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we extract a constant for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// If the first statement of a transaction fails because it was invalidated by a later
// transaction, then the transaction should be retried with an explicit BeginTransaction
// RPC. It could be that this occurred because of a previous transaction that timed out or
// was cancelled by the client, but that was sent to Cloud Spanner and that was still active
// on the backend.
exceptionToThrow =
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED,
e.getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1));
} else {
exceptionToThrow = e;
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
if (exceptionToThrow.getErrorCode() == ErrorCode.ABORTED) {
long delay = -1L;
if (e instanceof AbortedException) {
delay = ((AbortedException) e).getRetryDelayInMillis();
if (exceptionToThrow instanceof AbortedException) {
delay = ((AbortedException) exceptionToThrow).getRetryDelayInMillis();
}
if (delay == -1L) {
txnLogger.log(Level.FINE, "Retry duration is missing from the exception.", e);
txnLogger.log(
Level.FINE, "Retry duration is missing from the exception.", exceptionToThrow);
}

synchronized (lock) {
retryDelayInMillis = delay;
aborted = true;
}
}
return exceptionToThrow;
}

@Override
Expand DownExpand Up@@ -607,8 +626,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (Throwable t) {
onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
throw t;
throw onError(
SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
}
}

Expand DownExpand Up@@ -661,8 +680,7 @@ public Long apply(ResultSet input) {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
throw onError(e, builder.getTransaction().hasBegin());
}
},
MoreExecutors.directExecutor());
Expand DownExpand Up@@ -730,8 +748,8 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
}
return results;
} catch (Throwable e) {
onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
throw e;
throw onError(
SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
}
}

Expand DownExpand Up@@ -788,8 +806,7 @@ public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) {
@Override
public long[] apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
throw onError(e, builder.getTransaction().hasBegin());
}
},
MoreExecutors.directExecutor());
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.api.core.ApiAsyncFunction;
Expand DownExpand Up@@ -1711,6 +1712,88 @@ public long[] run(TransactionContext transaction) throws Exception {
assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

@Test
public void testInlinedBeginTx_withCancelledOnFirstStatement() {
final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)");
mockSpanner.putStatementResult(
StatementResult.exception(
statement,
Status.CANCELLED
.withDescription(
"Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.")
.asRuntimeException()));

DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
int attempt = 0;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (attempt > 0) {
mockSpanner.putStatementResult(StatementResult.update(statement, 1L));
}
attempt++;
return transaction.executeUpdate(statement);
}
});
assertEquals(1L, updateCount);
// The transaction will be retried because the first statement that also tried to include the
// BeginTransaction statement failed with the specific CANCELLED error and did not return a
// transaction. That forces a retry of the entire transaction with an explicit
// BeginTransaction RPC.
assertEquals(1, countRequests(BeginTransactionRequest.class));
// The update statement will be executed 2 times:
assertEquals(2, countRequests(ExecuteSqlRequest.class));
// The transaction will attempt to commit once.
assertEquals(1, countRequests(CommitRequest.class));
// The first update will start a transaction, but then fail the update statement. This will
// start a transaction on the mock server, but that transaction will never be returned to the
// client.
assertEquals(2, countTransactionsStarted());
}

@Test
public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() {
final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)");
mockSpanner.putStatementResult(
StatementResult.exception(
statement,
Status.CANCELLED
.withDescription(
"Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.")
.asRuntimeException()));

DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// The CANCELLED error is thrown both on the first and second attempt. The second attempt will
// not be retried, as it did not include a BeginTransaction option.
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(statement);
}
});
fail("missing expected exception");
} catch (SpannerException e) {
assertEquals(ErrorCode.CANCELLED, e.getErrorCode());
}
assertEquals(1, countRequests(BeginTransactionRequest.class));
// The update statement will be executed 2 times:
assertEquals(2, countRequests(ExecuteSqlRequest.class));
// The transaction will never attempt to commit.
assertEquals(0, countRequests(CommitRequest.class));
assertEquals(2, countTransactionsStarted());
}
}

private static int countRequests(Class<? extends AbstractMessage> requestType) {
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -48,7 +48,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down