Merged
Show file tree
Hide file tree
Changes from 1 commit
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
PrevPrevious commit
Next Next commit
fix: documentation wrongly used getRetryDelayInMillis() / 1000
  • Loading branch information
@olavloite
olavloite committedFeb 17, 2021
commit c9003a0658ab5c0030d892d61e58f6e61c0708ee
Original file line numberDiff line numberDiff line change
Expand Up@@ -24,9 +24,11 @@
import com.google.common.base.Predicate;
import com.google.rpc.ErrorInfo;
import com.google.rpc.ResourceInfo;
import com.google.rpc.RetryInfo;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
Expand DownExpand Up@@ -226,6 +228,22 @@ private static ErrorInfo extractErrorInfo(Throwable cause) {
return null;
}

static StatusRuntimeException createAbortedExceptionWithRetry(
String message, Throwable cause, long seconds, int nanos) {
Metadata.Key<RetryInfo> key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
Metadata trailers = new Metadata();
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(
com.google.protobuf.Duration.newBuilder().setNanos(nanos).setSeconds(seconds))
.build();
trailers.put(key, retryInfo);
return io.grpc.Status.ABORTED
.withDescription(message)
.withCause(cause)
.asRuntimeException(trailers);
}

static SpannerException newSpannerExceptionPreformatted(
ErrorCode code, @Nullable String message, @Nullable Throwable cause) {
// This is the one place in the codebase that is allowed to call constructors directly.
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -531,7 +531,9 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
// Simulate an aborted transaction to force a retry with a new transaction.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Aborted due to failed initial statement", e));
ErrorCode.ABORTED,
"Aborted due to failed initial statement",
SpannerExceptionFactory.createAbortedExceptionWithRetry(null, e, 0, 1)));
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
Expand DownExpand Up@@ -706,7 +708,13 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
throw newSpannerException(
ErrorCode.fromRpcStatus(response.getStatus()), response.getStatus().getMessage());
ErrorCode.fromRpcStatus(response.getStatus()),
response.getStatus().getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetry(
response.getStatus().getMessage(),
null,
0,
(int) TimeUnit.MILLISECONDS.toNanos(10L)));
} else if (response.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(response.getStatus()),
Expand DownExpand Up@@ -741,25 +749,31 @@ public ApiFuture<long[]> batchUpdateAsync(
response,
new ApiFunction<ExecuteBatchDmlResponse, long[]>() {
@Override
public long[] apply(ExecuteBatchDmlResponse input) {
long[] results = new long[input.getResultSetsCount()];
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) {
long[] results = new long[batchDmlResponse.getResultSetsCount()];
for (int i = 0; i < batchDmlResponse.getResultSetsCount(); ++i) {
results[i] = batchDmlResponse.getResultSets(i).getStats().getRowCountExact();
if (batchDmlResponse.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(
input.getResultSets(i).getMetadata().getTransaction(),
batchDmlResponse.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (input.getStatus().getCode() == Code.ABORTED_VALUE) {
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
throw newSpannerException(
ErrorCode.fromRpcStatus(input.getStatus()), input.getStatus().getMessage());
} else if (input.getStatus().getCode() != 0) {
ErrorCode.fromRpcStatus(batchDmlResponse.getStatus()),
batchDmlResponse.getStatus().getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetry(
batchDmlResponse.getStatus().getMessage(),
null,
0,
(int) TimeUnit.MILLISECONDS.toNanos(10L)));
} else if (batchDmlResponse.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(input.getStatus()),
input.getStatus().getMessage(),
ErrorCode.fromRpcStatus(batchDmlResponse.getStatus()),
batchDmlResponse.getStatus().getMessage(),
results);
}
return results;
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -725,8 +725,11 @@ private void handleAborted(AbortedException aborted) {
logger.fine(toString() + ": Starting internal transaction retry");
while (true) {
// First back off and then restart the transaction.
long delay = aborted.getRetryDelayInMillis();
try {
Thread.sleep(aborted.getRetryDelayInMillis());
if (delay > 0L) {
Thread.sleep(delay);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw SpannerExceptionFactory.newSpannerException(
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -1732,7 +1732,7 @@ private void simulateAbort(Session session, ByteString transactionId) {

public StatusRuntimeException createAbortedException(ByteString transactionId) {
RetryInfo retryInfo =
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build();
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(1).build()).build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -39,8 +39,8 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.ExecuteSqlRequest;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand DownExpand Up@@ -127,7 +127,8 @@ public void testSingleQueryAborted() {
try (Connection connection = createConnection(counter)) {
assertThat(counter.retryCount).isEqualTo(0);
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(Status.ABORTED.asRuntimeException()));
SimulatedExecutionTime.ofException(
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
QueryResult res = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT);

assertThat(get(res.finished)).isNull();
Expand All@@ -143,7 +144,8 @@ public void testTwoQueriesSecondAborted() {
assertThat(counter.retryCount).isEqualTo(0);
QueryResult res1 = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT);
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(Status.ABORTED.asRuntimeException()));
SimulatedExecutionTime.ofException(
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
QueryResult res2 = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT_2);

assertThat(get(res1.finished)).isNull();
Expand All@@ -160,12 +162,14 @@ public void testTwoQueriesBothAborted() throws InterruptedException {
try (Connection connection = createConnection(counter)) {
assertThat(counter.retryCount).isEqualTo(0);
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(Status.ABORTED.asRuntimeException()));
SimulatedExecutionTime.ofException(
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
QueryResult res1 = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT);
// Wait until the first query aborted.
assertThat(counter.latch.await(10L, TimeUnit.SECONDS)).isTrue();
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(Status.ABORTED.asRuntimeException()));
SimulatedExecutionTime.ofException(
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
QueryResult res2 = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT_2);

assertThat(get(res1.finished)).isNull();
Expand All@@ -180,7 +184,8 @@ public void testTwoQueriesBothAborted() throws InterruptedException {
public void testSingleQueryAbortedMidway() {
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(
Status.ABORTED.asRuntimeException(), RANDOM_RESULT_SET_ROW_COUNT / 2));
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")),
RANDOM_RESULT_SET_ROW_COUNT / 2));
RetryCounter counter = new RetryCounter();
try (Connection connection = createConnection(counter)) {
assertThat(counter.retryCount).isEqualTo(0);
Expand All@@ -200,7 +205,8 @@ public void testTwoQueriesSecondAbortedMidway() {
QueryResult res1 = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT);
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(
Status.ABORTED.asRuntimeException(), RANDOM_RESULT_SET_ROW_COUNT_2 / 2));
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")),
RANDOM_RESULT_SET_ROW_COUNT_2 / 2));
QueryResult res2 = executeQueryAsync(connection, SELECT_RANDOM_STATEMENT_2);

assertThat(get(res1.finished)).isNull();
Expand All@@ -215,7 +221,7 @@ public void testTwoQueriesSecondAbortedMidway() {
public void testTwoQueriesOneAbortedMidway() {
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(
Status.ABORTED.asRuntimeException(),
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")),
Math.min(RANDOM_RESULT_SET_ROW_COUNT / 2, RANDOM_RESULT_SET_ROW_COUNT_2 / 2)));
RetryCounter counter = new RetryCounter();
try (Connection connection = createConnection(counter)) {
Expand All@@ -239,7 +245,8 @@ public void testTwoQueriesOneAbortedMidway() {
public void testUpdateAndQueryAbortedMidway() throws InterruptedException {
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(
Status.ABORTED.asRuntimeException(), RANDOM_RESULT_SET_ROW_COUNT / 2));
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")),
RANDOM_RESULT_SET_ROW_COUNT / 2));
final RetryCounter counter = new RetryCounter();
try (Connection connection = createConnection(counter)) {
assertThat(counter.retryCount).isEqualTo(0);
Expand DownExpand Up@@ -334,7 +341,8 @@ public boolean apply(AbstractMessage input) {
public void testUpdateAndQueryAbortedMidway_UpdateCountChanged() throws InterruptedException {
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(
Status.ABORTED.asRuntimeException(), RANDOM_RESULT_SET_ROW_COUNT / 2));
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")),
RANDOM_RESULT_SET_ROW_COUNT / 2));
final RetryCounter counter = new RetryCounter();
try (Connection connection = createConnection(counter)) {
assertThat(counter.retryCount).isEqualTo(0);
Expand DownExpand Up@@ -423,7 +431,8 @@ public boolean apply(AbstractMessage input) {
public void testQueriesAbortedMidway_ResultsChanged() throws InterruptedException {
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(
Status.ABORTED.asRuntimeException(), RANDOM_RESULT_SET_ROW_COUNT - 1));
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")),
RANDOM_RESULT_SET_ROW_COUNT - 1));
final Statement statement = Statement.of("SELECT * FROM TEST_TABLE");
final RandomResultSetGenerator generator =
new RandomResultSetGenerator(RANDOM_RESULT_SET_ROW_COUNT - 10);
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -34,6 +34,10 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand DownExpand Up@@ -158,10 +162,23 @@ public void intercept(
probability = 0;
}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Transaction was aborted by interceptor");
ErrorCode.ABORTED,
"Transaction was aborted by interceptor",
createAbortedExceptionWithMinimalRetry());
}
}
}

private static StatusRuntimeException createAbortedExceptionWithMinimalRetry() {
Metadata.Key<RetryInfo> key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
Metadata trailers = new Metadata();
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(com.google.protobuf.Duration.newBuilder().setNanos(1).setSeconds(0L))
.build();
trailers.put(key, retryInfo);
return io.grpc.Status.ABORTED.asRuntimeException(trailers);
}
}

@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -48,7 +48,11 @@
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementParser.StatementType;
import com.google.rpc.RetryInfo;
import com.google.spanner.v1.ResultSetStats;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
Expand DownExpand Up@@ -97,7 +101,8 @@ public void commit() {
case ABORT:
state = TransactionState.COMMIT_FAILED;
commitBehavior = CommitBehavior.SUCCEED;
throw SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "commit aborted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "commit aborted", createAbortedExceptionWithMinimalRetry());
default:
throw new IllegalStateException();
}
Expand DownExpand Up@@ -443,7 +448,9 @@ public void testRetry() {
}

// first abort, then do nothing
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "commit aborted"))
doThrow(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "commit aborted", createAbortedExceptionWithMinimalRetry()))
.doNothing()
.when(txManager)
.commit();
Expand DownExpand Up@@ -672,4 +679,15 @@ public void testChecksumResultSetWithArray() {
rs2.next();
assertThat(rs1.getChecksum(), is(not(equalTo(rs2.getChecksum()))));
}

private static StatusRuntimeException createAbortedExceptionWithMinimalRetry() {
Metadata.Key<RetryInfo> key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
Metadata trailers = new Metadata();
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(com.google.protobuf.Duration.newBuilder().setNanos(1).setSeconds(0L))
.build();
trailers.put(key, retryInfo);
return io.grpc.Status.ABORTED.asRuntimeException(trailers);
}
}