Merged
Show file tree
Hide file tree
Changes from 1 commit
Show all changes
23 commits
Select commit Hold shift + click to select a range
9fb15fe
feat: inline begin tx with first statement
olavloiteJul 8, 2020
6b8d9dc
feat: support inlining BeginTransaction
olavloiteJul 8, 2020
59ff2aa
fix: invalid dml statement can still return tx id
olavloiteJul 8, 2020
4d4446f
bench: add benchmarks for inline begin
olavloiteJul 16, 2020
48a5db5
feat: add inline begin for async runner
olavloiteJul 17, 2020
261c911
test: add additional tests and ITs
olavloiteJul 17, 2020
af50669
test: add tests for error during tx
olavloiteJul 30, 2020
f30334e
test: use statement with same error code on emulator
olavloiteJul 30, 2020
a4d2e76
test: skip test on emulator
olavloiteJul 30, 2020
1817afa
test: constraint error causes transaction to be invalidated
olavloiteJul 30, 2020
61cc207
fix: retry transaction if first statements fails and had BeginTransac…
olavloiteJul 31, 2020
3bdff48
fix: handle aborted exceptions
olavloiteJul 31, 2020
057839f
Merge branch 'master' into inline-begin-tx
olavloiteSep 16, 2020
b3148a0
test: add additional tests for corner cases
olavloiteSep 16, 2020
f508bdb
feat: use single-use tx for idem-potent mutations
olavloiteSep 16, 2020
d9e938f
fix: remove check for idempotent mutations
olavloiteSep 17, 2020
8a28f61
Merge branch 'master' into inline-begin-tx
olavloiteSep 28, 2020
bec71d7
Merge branch 'master' into inline-begin-tx
olavloiteOct 5, 2020
07346f0
chore: remove commented code
olavloiteOct 6, 2020
2768f69
feat!: remove session pool preparing (#515)
olavloiteOct 21, 2020
b816a66
Merge branch 'master' into inline-begin-tx
olavloiteOct 21, 2020
24ea415
chore: run formatter
olavloiteOct 21, 2020
28277ff
test: fix integration test that relied on data from other test case
olavloiteOct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
PrevPrevious commit
Next Next commit
fix: retry transaction if first statements fails and had BeginTransac…
…tion option
  • Loading branch information
@olavloite
olavloite committedJul 31, 2020
commit 61cc2072724f908d9eb21bc3cd9ea378d02e9d51
Original file line numberDiff line numberDiff line change
Expand Up@@ -105,8 +105,9 @@ public TransactionContext resetForRetry() {
"resetForRetry can only be called if the previous attempt" + " aborted");
}
try (Scope s = tracer.withSpan(span)) {
boolean useInlinedBegin = inlineBegin && txn.transactionId != null;
txn = session.newTransaction();
if (!inlineBegin) {
if (!useInlinedBegin) {
txn.ensureTxn();
}
txnState = TransactionState.STARTED;
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -55,7 +55,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand DownExpand Up@@ -153,15 +152,13 @@ public void removeListener(Runnable listener) {
private long retryDelayInMillis = -1L;

/**
* transactionLock guards that only one request can be beginning the transaction at any time. We
* only hold on to this lock while a request is creating a transaction. After a transaction has
* been created, the lock is released and concurrent requests can be executed on the
* transactionIdFuture will return the transaction id returned by the first statement in the
* transaction if the BeginTransaction option is included with the first statement of the
* transaction.
*/
// private final ReentrantLock transactionLock = new ReentrantLock();
private volatile CountDownLatch transactionLatch = new CountDownLatch(0);
private volatile SettableApiFuture<ByteString> transactionIdFuture = null;

private volatile ByteString transactionId;
volatile ByteString transactionId;
private Timestamp commitTimestamp;

private TransactionContextImpl(Builder builder) {
Expand DownExpand Up@@ -402,39 +399,30 @@ TransactionSelector getTransactionSelector() {
try {
// Wait if another request is already beginning, committing or rolling back the
// transaction.

// transactionLock.lockInterruptibly();
while (true) {
CountDownLatch latch;
synchronized (lock) {
latch = transactionLatch;
ApiFuture<ByteString> tx = null;
synchronized (lock) {
if (transactionIdFuture == null) {
transactionIdFuture = SettableApiFuture.create();
} else {
tx = transactionIdFuture;
}
latch.await();

}
if (tx == null) {
return TransactionSelector.newBuilder()
.setBegin(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
} else {
TransactionSelector.newBuilder().setId(tx.get()).build();
}
} catch (ExecutionException e) {
if (e.getCause() instanceof AbortedException) {
synchronized (lock) {
if (transactionLatch.getCount() > 0L) {
continue;
}
// Check again if a transactionId is now available. It could be that the thread that
// was
// holding the lock and that had sent a statement with a BeginTransaction request
// caused
// an error and did not return a transaction.
if (transactionId == null) {
transactionLatch = new CountDownLatch(1);
// Return a TransactionSelector that will start a new transaction as part of the
// statement that is being executed.
return TransactionSelector.newBuilder()
.setBegin(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
} else {
// transactionLock.unlock();
break;
}
aborted = true;
}
}
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
}
Expand All@@ -449,8 +437,7 @@ public void onTransactionMetadata(Transaction transaction) {
// transaction on this instance and release the lock to allow other statements to proceed.
if (this.transactionId == null && transaction != null && transaction.getId() != null) {
this.transactionId = transaction.getId();
transactionLatch.countDown();
// transactionLock.unlock();
this.transactionIdFuture.set(transaction.getId());
}
}

Expand All@@ -459,12 +446,15 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
// Release the transactionLock if that is being held by this thread. That would mean that the
// statement that was trying to start a transaction caused an error. The next statement should
// in that case also include a BeginTransaction option.

// if (transactionLock.isHeldByCurrentThread()) {
// transactionLock.unlock();
// }
if (withBeginTransaction) {
transactionLatch.countDown();
// Simulate an aborted transaction to force a retry with a new transaction.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Aborted due to failed initial statement", e));
// synchronized (lock) {
// retryDelayInMillis = 0;
// aborted = true;
// }
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
Expand DownExpand Up@@ -758,21 +748,25 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
new Callable<T>() {
@Override
public T call() {
boolean useInlinedBegin = inlineBegin;
if (attempt.get() > 0) {
if (useInlinedBegin) {
// Do not inline the BeginTransaction during a retry if the initial attempt did not
// actually start a transaction.
useInlinedBegin = txn.transactionId != null;
}
txn = session.newTransaction();
}
checkState(
isValid,
"TransactionRunner has been invalidated by a new operation on the session");
attempt.incrementAndGet();
// TODO(user): When using reads, consider using the first read to begin
// the txn.
span.addAnnotation(
"Starting Transaction Attempt",
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue())));
// Only ensure that there is a transaction if we should not inline the beginTransaction
// with the first statement.
if (!inlineBegin) {
if (!useInlinedBegin) {
txn.ensureTxn();
}

Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -280,7 +280,10 @@ public Long run(TransactionContext transaction) throws Exception {
}
});
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
// The transaction will be retried because the first statement that also tried to include the
// BeginTransaction statement failed and did not return a transaction. That forces a retry of
// the entire transaction with an explicit BeginTransaction RPC.
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// The first update will start a transaction, but then fail the update statement. This will
// start a transaction on the mock server, but that transaction will never be returned to the
// client.
Expand DownExpand Up@@ -498,12 +501,36 @@ public void testTransactionManagerInlinedBeginTxWithError() {
}
}
}
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
// The first statement will fail and not return a transaction id. This will trigger a retry of
// the entire transaction, and the retry will do an explicit BeginTransaction RPC.
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// The first statement will start a transaction, but it will never be returned to the client as
// the update statement fails.
assertThat(countTransactionsStarted()).isEqualTo(2);
}

@SuppressWarnings("resource")
@Test
public void testTransactionManagerInlinedBeginTxWithUncaughtError() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
try (TransactionManager txMgr = client.transactionManager()) {
TransactionContext txn = txMgr.begin();
while (true) {
try {
txn.executeUpdate(INVALID_UPDATE_STATEMENT);
fail("missing expected exception");
} catch (AbortedException e) {
txn = txMgr.resetForRetry();
}
}
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
}
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countTransactionsStarted()).isEqualTo(1);
}

@Test
public void testInlinedBeginAsyncTx() throws InterruptedException, ExecutionException {
DatabaseClient client =
Expand DownExpand Up@@ -631,7 +658,9 @@ public ApiFuture<Long> doWorkAsync(TransactionContext transaction) {
},
executor);
assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
// The first statement will fail and not return a transaction id. This will trigger a retry of
// the entire transaction, and the retry will do an explicit BeginTransaction RPC.
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// The first update will start a transaction, but then fail the update statement. This will
// start a transaction on the mock server, but that transaction will never be returned to the
// client.
Expand Down
Loading