Merged
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Original file line numberDiff line numberDiff line change
Expand Up@@ -371,4 +371,10 @@
<className>com/google/cloud/spanner/ResultSets</className>
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>
</differences>
Original file line numberDiff line numberDiff line change
Expand Up@@ -18,9 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand DownExpand Up@@ -200,4 +197,11 @@ public interface AsyncTransactionFunction<I, O> {
*/
@Override
void close();

/**
* Closes the transaction manager. If there is an active transaction, it will be rolled back. The
* underlying session will be released back to the session pool. The returned {@link ApiFuture} is
* done when the transaction (if any) has been rolled back.
*/
ApiFuture<Void> closeAsync();
}
Original file line numberDiff line numberDiff line change
Expand Up@@ -24,6 +24,7 @@
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.opencensus.trace.Span;
Expand DownExpand Up@@ -54,7 +55,17 @@ public void setSpan(Span span) {

@Override
public void close() {
closeAsync();
}

@Override
public ApiFuture<Void> closeAsync() {
ApiFuture<Void> res = null;
if (txnState == TransactionState.STARTED) {
res = rollbackAsync();
}
txn.close();
return MoreObjects.firstNonNull(res, ApiFutures.<Void>immediateFuture(null));
}

@Override
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -22,7 +22,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
Expand DownExpand Up@@ -59,14 +58,41 @@ public void run() {

@Override
public void close() {
delegate.addListener(
new Runnable() {
SpannerApiFutures.get(closeAsync());
}

@Override
public ApiFuture<Void> closeAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
ApiFutures.addCallback(
delegate,
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
@Override
public void run() {
public void onFailure(Throwable t) {
session.close();
}

@Override
public void onSuccess(AsyncTransactionManagerImpl result) {
ApiFutures.addCallback(
result.closeAsync(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
res.setException(t);
}

@Override
public void onSuccess(Void result) {
session.close();
res.set(result);
}
},
MoreExecutors.directExecutor());
}
},
MoreExecutors.directExecutor());
return res;
}

@Override
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -23,8 +23,10 @@
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand DownExpand Up@@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
}
}

public void waitForRequestsToContain(
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
throws InterruptedException, TimeoutException {
Stopwatch watch = Stopwatch.createStarted();
while (true) {
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
if (msg.iterator().hasNext()) {
break;
}
Thread.sleep(10L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
throw new TimeoutException(
"Timeout while waiting for requests to contain the wanted request");
}
}
}

@Override
public void addResponse(AbstractMessage response) {
throw new UnsupportedOperationException();
Expand Down