Merged
Show file tree
Hide file tree
Changes from 1 commit
Show all changes
23 commits
Select commit Hold shift + click to select a range
9fb15fe
feat: inline begin tx with first statement
olavloiteJul 8, 2020
6b8d9dc
feat: support inlining BeginTransaction
olavloiteJul 8, 2020
59ff2aa
fix: invalid dml statement can still return tx id
olavloiteJul 8, 2020
4d4446f
bench: add benchmarks for inline begin
olavloiteJul 16, 2020
48a5db5
feat: add inline begin for async runner
olavloiteJul 17, 2020
261c911
test: add additional tests and ITs
olavloiteJul 17, 2020
af50669
test: add tests for error during tx
olavloiteJul 30, 2020
f30334e
test: use statement with same error code on emulator
olavloiteJul 30, 2020
a4d2e76
test: skip test on emulator
olavloiteJul 30, 2020
1817afa
test: constraint error causes transaction to be invalidated
olavloiteJul 30, 2020
61cc207
fix: retry transaction if first statements fails and had BeginTransac…
olavloiteJul 31, 2020
3bdff48
fix: handle aborted exceptions
olavloiteJul 31, 2020
057839f
Merge branch 'master' into inline-begin-tx
olavloiteSep 16, 2020
b3148a0
test: add additional tests for corner cases
olavloiteSep 16, 2020
f508bdb
feat: use single-use tx for idem-potent mutations
olavloiteSep 16, 2020
d9e938f
fix: remove check for idempotent mutations
olavloiteSep 17, 2020
8a28f61
Merge branch 'master' into inline-begin-tx
olavloiteSep 28, 2020
bec71d7
Merge branch 'master' into inline-begin-tx
olavloiteOct 5, 2020
07346f0
chore: remove commented code
olavloiteOct 6, 2020
2768f69
feat!: remove session pool preparing (#515)
olavloiteOct 21, 2020
b816a66
Merge branch 'master' into inline-begin-tx
olavloiteOct 21, 2020
24ea415
chore: run formatter
olavloiteOct 21, 2020
28277ff
test: fix integration test that relied on data from other test case
olavloiteOct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
PrevPrevious commit
Next Next commit
feat: add inline begin for async runner
  • Loading branch information
@olavloite
olavloite committedJul 30, 2020
commit 48a5db577dab88594dd7a823e86bdcfcf40ea75f
Original file line numberDiff line numberDiff line change
Expand Up@@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
}

/**
Expand DownExpand Up@@ -685,7 +686,7 @@ public void close() {
public void onTransactionMetadata(Transaction transaction) {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
Expand DownExpand Up@@ -746,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
GrpcResultSet resultSet =
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
return resultSet;
}

Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -81,7 +81,7 @@ interface Listener {
void onTransactionMetadata(Transaction transaction) throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e);
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
Expand All@@ -91,14 +91,17 @@ interface Listener {
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
private final GrpcValueIterator iterator;
private final Listener listener;
private final boolean beginTransaction;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.beginTransaction = beginTransaction;
}

@Override
Expand DownExpand Up@@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e);
throw yieldError(e, beginTransaction && currRow == null);
}
}

Expand All@@ -149,9 +152,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e) {
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e);
listener.onError(e, beginTransaction);
throw e;
}
}
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -240,7 +240,10 @@ public AsyncRunner runAsync() {
return new AsyncRunnerImpl(
setActive(
new TransactionRunnerImpl(
this, spanner.getRpc(), spanner.getDefaultPrefetchChunks(), false)));
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction())));
}

@Override
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -55,10 +55,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand DownExpand Up@@ -158,7 +158,8 @@ public void removeListener(Runnable listener) {
* been created, the lock is released and concurrent requests can be executed on the
* transaction.
*/
private final ReentrantLock transactionLock = new ReentrantLock();
// private final ReentrantLock transactionLock = new ReentrantLock();
private volatile CountDownLatch transactionLatch = new CountDownLatch(0);

private volatile ByteString transactionId;
private Timestamp commitTimestamp;
Expand DownExpand Up@@ -333,7 +334,7 @@ public void run() {
}
span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e));
TraceUtil.endSpanWithFailure(opSpan, e);
onError((SpannerException) e);
onError((SpannerException) e, false);
res.setException(e);
}
}
Expand DownExpand Up@@ -401,20 +402,38 @@ TransactionSelector getTransactionSelector() {
try {
// Wait if another request is already beginning, committing or rolling back the
// transaction.
transactionLock.lockInterruptibly();
// Check again if a transactionId is now available. It could be that the thread that was
// holding the lock and that had sent a statement with a BeginTransaction request caused
// an error and did not return a transaction.
if (transactionId == null) {
// Return a TransactionSelector that will start a new transaction as part of the
// statement that is being executed.
return TransactionSelector.newBuilder()
.setBegin(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
} else {
transactionLock.unlock();

// transactionLock.lockInterruptibly();
while (true) {
CountDownLatch latch;
synchronized (lock) {
latch = transactionLatch;
}
latch.await();

synchronized (lock) {
if (transactionLatch.getCount() > 0L) {
continue;
}
// Check again if a transactionId is now available. It could be that the thread that
// was
// holding the lock and that had sent a statement with a BeginTransaction request
// caused
// an error and did not return a transaction.
if (transactionId == null) {
transactionLatch = new CountDownLatch(1);
// Return a TransactionSelector that will start a new transaction as part of the
// statement that is being executed.
return TransactionSelector.newBuilder()
.setBegin(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
} else {
// transactionLock.unlock();
break;
}
}
}
} catch (InterruptedException e) {
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
Expand All@@ -430,18 +449,24 @@ public void onTransactionMetadata(Transaction transaction) {
// transaction on this instance and release the lock to allow other statements to proceed.
if (this.transactionId == null && transaction != null && transaction.getId() != null) {
this.transactionId = transaction.getId();
transactionLock.unlock();
transactionLatch.countDown();
// transactionLock.unlock();
}
}

@Override
public void onError(SpannerException e) {
public void onError(SpannerException e, boolean withBeginTransaction) {
// Release the transactionLock if that is being held by this thread. That would mean that the
// statement that was trying to start a transaction caused an error. The next statement should
// in that case also include a BeginTransaction option.
if (transactionLock.isHeldByCurrentThread()) {
transactionLock.unlock();

// if (transactionLock.isHeldByCurrentThread()) {
// transactionLock.unlock();
// }
if (withBeginTransaction) {
transactionLatch.countDown();
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
long delay = -1L;
if (e instanceof AbortedException) {
Expand DownExpand Up@@ -494,7 +519,7 @@ public long executeUpdate(Statement statement) {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (SpannerException e) {
onError(e);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
}
}
Expand All@@ -504,7 +529,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
// commit.
Expand DownExpand Up@@ -538,7 +563,7 @@ public Long apply(ResultSet input) {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
onError(e);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
}
},
Expand All@@ -547,6 +572,14 @@ public Long apply(Throwable input) {
new Runnable() {
@Override
public void run() {
try {
if (resultSet.get().getMetadata().hasTransaction()) {
onTransactionMetadata(resultSet.get().getMetadata().getTransaction());
}
} catch (ExecutionException | InterruptedException e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
}
decreaseAsyncOperations();
}
},
Expand DownExpand Up@@ -582,7 +615,7 @@ public long[] batchUpdate(Iterable<Statement> statements) {
}
return results;
} catch (SpannerException e) {
onError(e);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
}
}
Expand DownExpand Up@@ -610,6 +643,9 @@ public long[] apply(ExecuteBatchDmlResponse input) {
long[] results = new long[input.getResultSetsCount()];
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(input.getResultSets(i).getMetadata().getTransaction());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
Expand All@@ -633,9 +669,13 @@ public void run() {
try {
updateCounts.get();
} catch (ExecutionException e) {
onError(SpannerExceptionFactory.newSpannerException(e.getCause()));
onError(
SpannerExceptionFactory.newSpannerException(e.getCause()),
builder.hasTransaction() && builder.getTransaction().hasBegin());
} catch (InterruptedException e) {
onError(SpannerExceptionFactory.propagateInterrupt(e));
onError(
SpannerExceptionFactory.propagateInterrupt(e),
builder.hasTransaction() && builder.getTransaction().hasBegin());
} finally {
decreaseAsyncOperations();
}
Expand Down
Loading